[toc]
## 一、鏈路保持
鏈路保持流程應遵循以下規定:
1. 下級平臺登錄成功后,在與上級平臺之間如果有應用業務數據包往來的情況下,不需要發送鏈路保持數據包;否則,下級平臺應每隔1min發送一個鏈路保持請求數據包到上級平臺以保持鏈路連接;
2. 在沒有應用數據包往來的情況下,上級平臺連續3min未收到下級平臺發送的鏈路保持請求數據包,則認為與下級平臺的連接中斷,將主動斷開數據傳輸鏈路;
3. 在沒有應用數據包往來的情況下,下級平臺連續3min未收到上級平臺發送的鏈路保持應答數據包,則認為與上級平臺的連接中斷,將主動斷開數據傳輸從鏈路。
解析: 第一條和第三條都是關于客戶端的實現。第二條是服務端的實現。
## 二、NIO長連接ChannelServer
長連接的實現方案:
- 基于Socket+多線程;
- Java NIO;
- Mina;
- Netty;
- QuickServer。
本案例,基于性能+簡潔的考慮,我們基于Java的NIO實現長連接服務端。
### 代碼結構
```
location-center
|-- src
|-- main
|-- java
|-- com.zihan.location.parser
|-- AuthHelper.java
|-- LocationParserHelper.java
|-- ByteMeta.java
|-- socket
|-- alive
|-- ChannelServer.java ## 長連接socketServer
|-- Protocol.java ## socket操作
|-- ProtocolImpl.java ## socket操作實現
```
### NIO服務端開發步驟
寫代碼之前,我們先總結一下使用Java NIO進行服務端開發的步驟:
1. 創建ServerSocketChannel,配置它為非阻塞模式;
2. 綁定監聽,配置TCP參數,例如backlog大小;
3. 創建一個獨立的I/O線程,用于輪詢多路復用器Selector;
4. 創建Selector,將之前創建的ServerSocketChannel注冊到Selector上,監聽`SelectionKey.ACCEPT`;
5. 啟動I/O線程,在循環體中執行Selector.select()方法,輪詢就緒的Channel;
6. 當輪詢到了處于就緒狀態的Channel時,需要對其進行判斷,如果是`OP_ACCEPT`狀態,說明是新的客戶端接入,則調用`ServerSocketChannelaccept()`方法接受新的客戶端;
7. 設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置其他的一些TCP參數;
8. 將SocketChannel注冊到Selector,監聽`OP_READ`操作位;
9. 如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的就緒的數據包需要讀取,則構造ByteBuffer對象,讀取數據包;
10. 如果輪詢的Channel為`OP_WRITE`,說明還有數據沒有發送完成,需要繼續發送。
```java
@Component
@Slf4j
public class ChannelServer {
private static final int BUFFSIZE=1024;
private static final int TIMEOUT= 180000;
private static final int PORT = 8089;
@Resource
private Protocol protocol;////創建處理協議類型
public void sendMessagestartSocketServer() {
try {
Selector selector = Selector.open();//創建選擇器
//打開監聽信道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
//與端口綁定
serverChannel.socket().bind(new InetSocketAddress(PORT));
//設置非阻塞模式
serverChannel.configureBlocking(false);
//將選擇器注冊到監聽信道,只有非阻塞信道才可以注冊,并指出該信道可以Accept
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
//等待某信道就緒(或超時)
if(selector.select(TIMEOUT)==0){
log.info("無客戶端連接,系統等待中");
continue;
}
//取得迭代器selectedKey()中包含了每個準備好某一操作信道的selectionKey
Iterator it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey) it.next();
try{
if(key.isAcceptable()){
//有客戶端請求時
//接受請求并處理 處理該key
protocol.handleAccept(key);
}
if(key.isValid() && key.isReadable()){
//從客戶端取數據
protocol.handleRead(key);
}
// if(key.isValid() && key.isWritable()){
// //客戶端可寫時進行發送
// Thread.sleep(1000);
// if (response.length > 0){
// protocol.handlWrite(key, ByteBuffer.wrap(response));
// }
// }
}catch(Exception e){
it.remove();
continue;
}
it.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
### Protocol接口
Protocol接口,定義了socket服務端在讀、寫狀態時,可以進行的幾種操作。
- 客戶端連接時,注冊一個通道到選擇器;
- 通道狀態為可讀時,讀取緩沖區中的數據;
- 通道狀態為可寫時,向寫緩沖區寫入數據。
```java
public interface Protocol {
void handleAccept(SelectionKey skey) throws IOException;
void handleRead(SelectionKey skey) throws IOException;
void handlWrite(SelectionKey skey, ByteBuffer bf) throws IOException;
}
```
Protocol接口實現:
```
@Slf4j
@Service
public class ProtocolImpl implements Protocol {
@Resource
private ReportLogService reportLogService;
private int buffSize = 2048;
/**
* 接收一個socketChannel的處理
*/
@Override
public void handleAccept(SelectionKey skey) throws IOException {
// TODO Auto-generated method stub
SocketChannel clientChannel = ((ServerSocketChannel)skey.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(skey.selector(), SelectionKey.OP_READ,
ByteBuffer.allocate(buffSize));
log.info("clientChannel"+clientChannel);
}
/**
* 向一個socketChannel寫入
*/
@Override
public void handlWrite(SelectionKey skey , ByteBuffer bf) throws IOException {
// TODO Auto-generated method stub
SocketChannel clientChannel = (SocketChannel) skey.channel();
clientChannel.write(bf);
skey.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
/**
* 接收一個socketChannel的處理
*/
@Override
public void handleRead(SelectionKey skey) throws IOException {
// 獲得客戶端通訊信道
SocketChannel clientChannel = (SocketChannel) skey.channel();
log.info("數據讀取中:"+clientChannel);
// 得到清空緩沖區
ByteBuffer bf = (ByteBuffer) skey.attachment();
bf.clear();
long bytesRead;
try {
bytesRead = clientChannel.read(bf);
}catch (IOException e){
skey.cancel();
clientChannel.socket().close();
clientChannel.close();
return;
}
if (bytesRead == -1) {
clientChannel.close();
} else {
bf.flip();
byte[] bytes = bf.array();
log.info("服務器收到來自"+clientChannel.getRemoteAddress()+"的消息:"+ByteUtil.bytesToHex(bytes));
this.packageParse(bytes,clientChannel);
//根據bytes完成業務解析功能
skey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
}
```
## 三、粘包與拆包
客戶端為了提高Socket通訊的效率,往往在開發過程中,不是有一條消息就發送一條請求。而是,緩沖區快滿的時候再把數據發送出去。因此,我們會遇到粘包、拆包的一些情形。
**什么是粘包?**
簡單來講,就是發送端需要等緩沖區滿才發送出去,這樣發送數據包就是粘包。
因此,我們要對粘包按照協議規定的頭、尾信息進行拆分。然后再通過遍歷的方式,分別處理數據包中的業務請求。
```
private void packageParse(byte[] bytes,SocketChannel clientChannel) throws IOException {
//解決粘包問題
byte[] response = new byte[0];
//整包解析
List<byte[]> list = this.packageCut(bytes);
for (byte[] item : list){
try {
response = ArrayUtils.addAll(response,LocationParserHelper.buildResponse(item,clientChannel));
} catch (Exception e) {
log.error(e.getMessage());
e.printStackTrace();
response = ArrayUtils.addAll(response,ResponseHelper.error());
}
}
clientChannel.write(ByteBuffer.wrap(response));
//記錄日志
this.saveLog(bytes,clientChannel,response);
}
/**
* 拆包
* @param bytes
* @return
*/
private List<byte[]> packageCut(byte[] bytes){
List<byte[]> list = new ArrayList<>();
int start =0 ,end = 0;
for (int i = 0; i<bytes.length ;i++){
byte item = bytes[i];
if (item == MsgConfig.getSTARTER()){
start = i;
i += MsgConfig.getHeaderLength();//提升解包效率
}else if (item == MsgConfig.getEND()){
end = i;
if (end > start) {
log.info("拆包成功:解析到1條數據,start=" + start + ",end=" + end + ",i=" + i);
byte[] sub = ByteUtil.subBytes(bytes, start, end - start + 1);
String s = ByteUtil.bytesToHex(sub);
if (!s.startsWith("5b")){
log.info("拆包得到異常包:" + s);
}else {
log.info("拆包得到數據為:" + ByteUtil.bytesToHex(sub));
list.add(sub);
}
start = end + 1;
}
}
}
log.info("拆包成功,得到" + list.size() + "條數據");
return list;
}
```
## 四、日志記錄
另外,這個通訊框架中我們還可以加入日志的功能,把通訊過程記錄作完整的記錄。
### 數據庫設計
```
CREATE TABLE `loc_report_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`ip` varchar(20) DEFAULT NULL COMMENT '客戶端ip地址',
`msg_body` text COMMENT '消息體',
`msg_response` text COMMENT '消息回復',
`create_time` datetime DEFAULT NULL COMMENT '創建時間',
`update_time` datetime DEFAULT NULL COMMENT '更新時間',
`deleted` int(1) DEFAULT '0' COMMENT '是否刪除:0-否;1-已刪除',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=50577 DEFAULT CHARSET=utf8mb4
```
```
/**
* 記錄同步日志信息
* TODO:改成AOP方式
* @param bytes
* @param clientChannel
* @param response
*/
private void saveLog (byte[] bytes,SocketChannel clientChannel,byte[] response){
ReportLog reportLog = new ReportLog();
try {
reportLog.setIp(clientChannel.getRemoteAddress().toString().replace("/",""));
reportLog.setMsgBody(ByteUtil.bytesToHex(bytes));
reportLog.setMsgResponse(ByteUtil.bytesToHex(response));
reportLogService.insert(reportLog);
}catch (Exception e){
e.printStackTrace();
log.warn("日志存儲失敗");
}
}
```
- 第一章 開篇寄語
- 1-1 技術選型要點
- 1-2 認識905.4王國的交流規范
- 1-3 聯系作者
- 第二章 Socket編程的基礎知識
- 2-1 Socket家族的基石
- 2-2 byte數組基礎
- 2-3 緩沖區基礎
- 2-4 NIO Socket通訊的工作原理
- 第三章 905.4規范解讀
- 3-1 基于通道選擇器的Socket長連接及消息讀寫框架
- 3-2 嚴格的信件收發員
- 3-3 負責消息處理的一家子
- 3-4 負責認證的大兒子(AuthWorker)
- 3-5 啞巴老二(PingWoker)
- 3-6 勤奮的定位匯報員老三(LocationReportWorker)
- 3-7 精明的老四(BusinessReportWorker)
- 3-8 數據檢察官——CRC16-CCITT校驗
- 3-11 數據的加密官
- 3-12 頭尾標識轉義
- 第四章 測試方法
- 4-1 測試數據樣例
- 4-2 客戶端鏈路保持功能實現
- 4-3 使用Socket短連接進行功能測試
- 4-4 NIO服務端性能分析
- 4-5 http測試方法(推薦)
- 第五章 從NIO到netty
- 5-1 編程進階——Netty核心基礎
- 5-2 Netty使用常見問題
- 5-3 使用Netty重寫Server端
- 5-4 Netty之鏈路管理
- 5-5 netty堆外內存泄漏如何應對?
- 第六章 統計與監控
- 6-1 Grafana監控面板
- 第七章 售后服務
- 7-1 勘誤與優化
- 7-2 獲取源碼