<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [toc] ## 一、鏈路保持 鏈路保持流程應遵循以下規定: 1. 下級平臺登錄成功后,在與上級平臺之間如果有應用業務數據包往來的情況下,不需要發送鏈路保持數據包;否則,下級平臺應每隔1min發送一個鏈路保持請求數據包到上級平臺以保持鏈路連接; 2. 在沒有應用數據包往來的情況下,上級平臺連續3min未收到下級平臺發送的鏈路保持請求數據包,則認為與下級平臺的連接中斷,將主動斷開數據傳輸鏈路; 3. 在沒有應用數據包往來的情況下,下級平臺連續3min未收到上級平臺發送的鏈路保持應答數據包,則認為與上級平臺的連接中斷,將主動斷開數據傳輸從鏈路。 解析: 第一條和第三條都是關于客戶端的實現。第二條是服務端的實現。 ## 二、NIO長連接ChannelServer 長連接的實現方案: - 基于Socket+多線程; - Java NIO; - Mina; - Netty; - QuickServer。 本案例,基于性能+簡潔的考慮,我們基于Java的NIO實現長連接服務端。 ### 代碼結構 ``` location-center |-- src |-- main |-- java |-- com.zihan.location.parser |-- AuthHelper.java |-- LocationParserHelper.java |-- ByteMeta.java |-- socket |-- alive |-- ChannelServer.java ## 長連接socketServer |-- Protocol.java ## socket操作 |-- ProtocolImpl.java ## socket操作實現 ``` ### NIO服務端開發步驟 寫代碼之前,我們先總結一下使用Java NIO進行服務端開發的步驟: 1. 創建ServerSocketChannel,配置它為非阻塞模式; 2. 綁定監聽,配置TCP參數,例如backlog大小; 3. 創建一個獨立的I/O線程,用于輪詢多路復用器Selector; 4. 創建Selector,將之前創建的ServerSocketChannel注冊到Selector上,監聽`SelectionKey.ACCEPT`; 5. 啟動I/O線程,在循環體中執行Selector.select()方法,輪詢就緒的Channel; 6. 當輪詢到了處于就緒狀態的Channel時,需要對其進行判斷,如果是`OP_ACCEPT`狀態,說明是新的客戶端接入,則調用`ServerSocketChannelaccept()`方法接受新的客戶端; 7. 設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置其他的一些TCP參數; 8. 將SocketChannel注冊到Selector,監聽`OP_READ`操作位; 9. 如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的就緒的數據包需要讀取,則構造ByteBuffer對象,讀取數據包; 10. 如果輪詢的Channel為`OP_WRITE`,說明還有數據沒有發送完成,需要繼續發送。 ```java @Component @Slf4j public class ChannelServer { private static final int BUFFSIZE=1024; private static final int TIMEOUT= 180000; private static final int PORT = 8089; @Resource private Protocol protocol;////創建處理協議類型 public void sendMessagestartSocketServer() { try { Selector selector = Selector.open();//創建選擇器 //打開監聽信道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); //與端口綁定 serverChannel.socket().bind(new InetSocketAddress(PORT)); //設置非阻塞模式 serverChannel.configureBlocking(false); //將選擇器注冊到監聽信道,只有非阻塞信道才可以注冊,并指出該信道可以Accept serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ //等待某信道就緒(或超時) if(selector.select(TIMEOUT)==0){ log.info("無客戶端連接,系統等待中"); continue; } //取得迭代器selectedKey()中包含了每個準備好某一操作信道的selectionKey Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey) it.next(); try{ if(key.isAcceptable()){ //有客戶端請求時 //接受請求并處理 處理該key protocol.handleAccept(key); } if(key.isValid() && key.isReadable()){ //從客戶端取數據 protocol.handleRead(key); } // if(key.isValid() && key.isWritable()){ // //客戶端可寫時進行發送 // Thread.sleep(1000); // if (response.length > 0){ // protocol.handlWrite(key, ByteBuffer.wrap(response)); // } // } }catch(Exception e){ it.remove(); continue; } it.remove(); } } } catch (Exception e) { e.printStackTrace(); } } } ``` ### Protocol接口 Protocol接口,定義了socket服務端在讀、寫狀態時,可以進行的幾種操作。 - 客戶端連接時,注冊一個通道到選擇器; - 通道狀態為可讀時,讀取緩沖區中的數據; - 通道狀態為可寫時,向寫緩沖區寫入數據。 ```java public interface Protocol { void handleAccept(SelectionKey skey) throws IOException; void handleRead(SelectionKey skey) throws IOException; void handlWrite(SelectionKey skey, ByteBuffer bf) throws IOException; } ``` Protocol接口實現: ``` @Slf4j @Service public class ProtocolImpl implements Protocol { @Resource private ReportLogService reportLogService; private int buffSize = 2048; /** * 接收一個socketChannel的處理 */ @Override public void handleAccept(SelectionKey skey) throws IOException { // TODO Auto-generated method stub SocketChannel clientChannel = ((ServerSocketChannel)skey.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(skey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(buffSize)); log.info("clientChannel"+clientChannel); } /** * 向一個socketChannel寫入 */ @Override public void handlWrite(SelectionKey skey , ByteBuffer bf) throws IOException { // TODO Auto-generated method stub SocketChannel clientChannel = (SocketChannel) skey.channel(); clientChannel.write(bf); skey.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE); } /** * 接收一個socketChannel的處理 */ @Override public void handleRead(SelectionKey skey) throws IOException { // 獲得客戶端通訊信道 SocketChannel clientChannel = (SocketChannel) skey.channel(); log.info("數據讀取中:"+clientChannel); // 得到清空緩沖區 ByteBuffer bf = (ByteBuffer) skey.attachment(); bf.clear(); long bytesRead; try { bytesRead = clientChannel.read(bf); }catch (IOException e){ skey.cancel(); clientChannel.socket().close(); clientChannel.close(); return; } if (bytesRead == -1) { clientChannel.close(); } else { bf.flip(); byte[] bytes = bf.array(); log.info("服務器收到來自"+clientChannel.getRemoteAddress()+"的消息:"+ByteUtil.bytesToHex(bytes)); this.packageParse(bytes,clientChannel); //根據bytes完成業務解析功能 skey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } } ``` ## 三、粘包與拆包 客戶端為了提高Socket通訊的效率,往往在開發過程中,不是有一條消息就發送一條請求。而是,緩沖區快滿的時候再把數據發送出去。因此,我們會遇到粘包、拆包的一些情形。 **什么是粘包?** 簡單來講,就是發送端需要等緩沖區滿才發送出去,這樣發送數據包就是粘包。 因此,我們要對粘包按照協議規定的頭、尾信息進行拆分。然后再通過遍歷的方式,分別處理數據包中的業務請求。 ``` private void packageParse(byte[] bytes,SocketChannel clientChannel) throws IOException { //解決粘包問題 byte[] response = new byte[0]; //整包解析 List<byte[]> list = this.packageCut(bytes); for (byte[] item : list){ try { response = ArrayUtils.addAll(response,LocationParserHelper.buildResponse(item,clientChannel)); } catch (Exception e) { log.error(e.getMessage()); e.printStackTrace(); response = ArrayUtils.addAll(response,ResponseHelper.error()); } } clientChannel.write(ByteBuffer.wrap(response)); //記錄日志 this.saveLog(bytes,clientChannel,response); } /** * 拆包 * @param bytes * @return */ private List<byte[]> packageCut(byte[] bytes){ List<byte[]> list = new ArrayList<>(); int start =0 ,end = 0; for (int i = 0; i<bytes.length ;i++){ byte item = bytes[i]; if (item == MsgConfig.getSTARTER()){ start = i; i += MsgConfig.getHeaderLength();//提升解包效率 }else if (item == MsgConfig.getEND()){ end = i; if (end > start) { log.info("拆包成功:解析到1條數據,start=" + start + ",end=" + end + ",i=" + i); byte[] sub = ByteUtil.subBytes(bytes, start, end - start + 1); String s = ByteUtil.bytesToHex(sub); if (!s.startsWith("5b")){ log.info("拆包得到異常包:" + s); }else { log.info("拆包得到數據為:" + ByteUtil.bytesToHex(sub)); list.add(sub); } start = end + 1; } } } log.info("拆包成功,得到" + list.size() + "條數據"); return list; } ``` ## 四、日志記錄 另外,這個通訊框架中我們還可以加入日志的功能,把通訊過程記錄作完整的記錄。 ### 數據庫設計 ``` CREATE TABLE `loc_report_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID', `ip` varchar(20) DEFAULT NULL COMMENT '客戶端ip地址', `msg_body` text COMMENT '消息體', `msg_response` text COMMENT '消息回復', `create_time` datetime DEFAULT NULL COMMENT '創建時間', `update_time` datetime DEFAULT NULL COMMENT '更新時間', `deleted` int(1) DEFAULT '0' COMMENT '是否刪除:0-否;1-已刪除', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=50577 DEFAULT CHARSET=utf8mb4 ``` ``` /** * 記錄同步日志信息 * TODO:改成AOP方式 * @param bytes * @param clientChannel * @param response */ private void saveLog (byte[] bytes,SocketChannel clientChannel,byte[] response){ ReportLog reportLog = new ReportLog(); try { reportLog.setIp(clientChannel.getRemoteAddress().toString().replace("/","")); reportLog.setMsgBody(ByteUtil.bytesToHex(bytes)); reportLog.setMsgResponse(ByteUtil.bytesToHex(response)); reportLogService.insert(reportLog); }catch (Exception e){ e.printStackTrace(); log.warn("日志存儲失敗"); } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看