## 47 工作實戰:Socket 結合線程池的使用
## 引導語
Socket 面試最終題一般都是讓你寫一個簡單的客戶端和服務端通信的例子,本文就帶大家一起來寫這個 demo。
### 1 要求
1. 可以使用 Socket 和 ServiceSocket 以及其它 API;
2. 寫一個客戶端和服務端之間 TCP 通信的例子;
3. 服務端處理任務需要異步處理;
4. 因為服務端處理能力很弱,只能同時處理 5 個請求,當第六個請求到達服務器時,需要服務器返回明確的錯誤信息:服務器太忙了,請稍后重試~。
需求比較簡單,唯一復雜的地方在于第四點,我們需要對客戶端的請求量進行控制,首先我們需要確認的是,我們是無法控制客戶端發送的請求數的,所以我們只能從服務端進行改造,比如從服務端進行限流。
有的同學可能很快想到,我們應該使用 ServerSocket 的 backlog 的屬性,把其設置成 5,但我們在上一章中說到 backlog 并不能準確代表限制的客戶端連接數,而且我們還要求服務端返回具體的錯誤信息,即使 backlog 生效,也只會返回固定的錯誤信息,不是我們定制的錯誤信息。
我們好好想想,線程池似乎可以做這個事情,我們可以把線程池的 coreSize 和 maxSize 都設置成 4,把隊列大小設置成 1,這樣服務端每次收到請求后,會先判斷一下線程池中的隊列有沒有數據,如果有的話,說明當前服務器已經馬上就要處理第五個請求了,當前請求就是第六個請求,應該被拒絕。
正好線程池的加入也可以滿足第三點,服務端的任務可以異步執行。
### 2 客戶端代碼
客戶端的代碼比較簡單,直接向服務器請求數據即可,代碼如下:
```
public class SocketClient { private static final Integer SIZE = 1024; private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>(400)); @Test public void test() throws InterruptedException { // 模擬客戶端同時向服務端發送 6 條消息 for (int i = 0; i < 6; i++) { socketPoll.submit(() -> { send("localhost", 7007, "nihao"); }); } Thread.sleep(1000000000); } /** * 發送tcp * * @param domainName 域名 * @param port 端口 * @param content 發送內容 */ public static String send(String domainName, int port, String content) { log.info("客戶端開始運行");
Socket socket = null; OutputStream outputStream = null; InputStreamReader isr = null; BufferedReader br = null; InputStream is = null; StringBuffer response = null; try { if (StringUtils.isBlank(domainName)) { return null; } // 無參構造器初始化 Socket,默認底層協議是 TCP socket = new Socket(); socket.setReuseAddress(true); // 客戶端準備連接服務端,設置超時時間 10 秒 socket.connect(new InetSocketAddress(domainName, port), 10000); log.info("TCPClient 成功和服務端建立連接"); // 準備發送消息給服務端 outputStream = socket.getOutputStream(); // 設置 UTF 編碼,防止亂碼 byte[] bytes = content.getBytes(Charset.forName("UTF-8")); // 輸出字節碼 segmentWrite(bytes, outputStream); // 關閉輸出 socket.shutdownOutput(); log.info("TCPClient 發送內容為 {}",content); // 等待服務端的返回 socket.setSoTimeout(50000);//50秒還沒有得到數據,直接斷開連接 // 得到服務端的返回流 is = socket.getInputStream(); isr = new InputStreamReader(is); br = new BufferedReader(isr);
// 從流中讀取返回值 response = segmentRead(br); // 關閉輸入流 socket.shutdownInput(); //關閉各種流和套接字 close(socket, outputStream, isr, br, is); log.info("TCPClient 接受到服務端返回的內容為 {}",response); return response.toString(); } catch (ConnectException e) { log.error("TCPClient-send socket連接失敗", e); throw new RuntimeException("socket連接失敗"); } catch (Exception e) { log.error("TCPClient-send unkown errror", e); throw new RuntimeException("socket連接失敗"); } finally { try { close(socket, outputStream, isr, br, is); } catch (Exception e) { // do nothing } } } /** * 關閉各種流 * * @param socket * @param outputStream * @param isr * @param br * @param is
* @throws IOException */ public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr, BufferedReader br, InputStream is) throws IOException { if (null != socket && !socket.isClosed()) { try { socket.shutdownOutput(); } catch (Exception e) { } try { socket.shutdownInput(); } catch (Exception e) { } try { socket.close(); } catch (Exception e) { } } if (null != outputStream) { outputStream.close(); } if (null != br) { br.close(); } if (null != isr) { isr.close(); } if (null != is) { is.close(); } }
/** * 分段讀 * * @param br * @throws IOException */ public static StringBuffer segmentRead(BufferedReader br) throws IOException { StringBuffer sb = new StringBuffer(); String line; while ((line = br.readLine()) != null) { sb.append(line); } return sb; } /** * 分段寫 * * @param bytes * @param outputStream * @throws IOException */ public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException { int length = bytes.length; int start, end = 0; for (int i = 0; end != bytes.length; i++) { start = i == 0 ? 0 : i * SIZE; end = length > SIZE ? start + SIZE : bytes.length; length -= SIZE; outputStream.write(bytes, start, end - start); outputStream.flush(); }
} }
```
客戶端代碼中我們也用到了線程池,主要是為了并發模擬客戶端一次性發送 6 個請求,按照預期服務端在處理第六個請求的時候,會返回特定的錯誤信息給客戶端。
以上代碼主要方法是 send 方法,主要處理像服務端發送數據,并處理服務端的響應。
### 3 服務端代碼
服務端的邏輯分成兩個部分,第一部分是控制客戶端的請求個數,當超過服務端的能力時,拒絕新的請求,當服務端能力可響應時,放入新的請求,第二部分是服務端任務的執行邏輯。
#### 3.1 對客戶端請求進行控制
```
public class SocketServiceStart { /** * 服務端的線程池,兩個作用 * 1:讓服務端的任務可以異步執行 * 2:管理可同時處理的服務端的請求數 */ private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>( 1));
@Test public void test(){ start(); } /** * 啟動服務端 */ public static final void start() { log.info("SocketServiceStart 服務端開始啟動"); try { // backlog serviceSocket處理阻塞時,客戶端最大的可創建連接數,超過客戶端連接不上 // 當線程池能力處理滿了之后,我們希望盡量阻塞客戶端的連接 // ServerSocket serverSocket = new ServerSocket(7007,1,null); // 初始化服務端 ServerSocket serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); // serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80)); serverSocket.bind(new InetSocketAddress("localhost", 7007)); log.info("SocketServiceStart 服務端啟動成功"); // 自旋,讓客戶端一直在取客戶端的請求,如果客戶端暫時沒有請求,會一直阻塞 while (true) { // 接受客戶端的請求 Socket socket = serverSocket.accept(); // 如果隊列中有數據了,說明服務端已經到了并發處理的極限了,此時需要返回客戶端有意義的信息 if (collectPoll.getQueue().size() >= 1) { log.info("SocketServiceStart 服務端處理能力到頂,需要控制客戶端的請求"); //返回處理結果給客戶端 rejectRequest(socket);
continue; } try { // 異步處理客戶端提交上來的任務 collectPoll.submit(new SocketService(socket)); } catch (Exception e) { socket.close(); } } } catch (Exception e) { log.error("SocketServiceStart - start error", e); throw new RuntimeException(e); } catch (Throwable e) { log.error("SocketServiceStart - start error", e); throw new RuntimeException(e); } } // 返回特定的錯誤碼給客戶端 public static void rejectRequest(Socket socket) throws IOException { OutputStream outputStream = null; try{ outputStream = socket.getOutputStream(); byte[] bytes = "服務器太忙了,請稍后重試~".getBytes(Charset.forName("UTF-8")); SocketClient.segmentWrite(bytes, outputStream); socket.shutdownOutput(); }finally { //關閉流 SocketClient.close(socket,outputStream,null,null,null); } } }
```
我們使用 collectPoll.getQueue().size() >= 1 來判斷目前服務端是否已經到達處理的極限了,如果隊列中有一個任務正在排隊,說明當前服務端已經超負荷運行了,新的請求應該拒絕掉,如果隊列中沒有數據,說明服務端還可以接受新的請求。
以上代碼注釋詳細,就不累贅說了。
#### 3.2 服務端任務的處理邏輯
服務端的處理邏輯比較簡單,主要步驟是:從客戶端的 Socket 中讀取輸入,進行處理,把響應返回給客戶端。
我們使用線程沉睡 2 秒來模擬服務端的處理邏輯,代碼如下:
```
public class SocketService implements Runnable { private Socket socket; public SocketService() { } public SocketService(Socket socket) { this.socket = socket; } @Override public void run() { log.info("SocketService 服務端任務開始執行"); OutputStream outputStream = null; InputStream is = null; InputStreamReader isr = null; BufferedReader br = null; try {
//接受消息 socket.setSoTimeout(10000);// 10秒還沒有得到數據,直接斷開連接 is = socket.getInputStream(); isr = new InputStreamReader(is,"UTF-8"); br = new BufferedReader(isr); StringBuffer sb = SocketClient.segmentRead(br); socket.shutdownInput(); log.info("SocketService accept info is {}", sb.toString()); //服務端處理 模擬服務端處理耗時 Thread.sleep(2000); String response = sb.toString(); //返回處理結果給客戶端 outputStream = socket.getOutputStream(); byte[] bytes = response.getBytes(Charset.forName("UTF-8")); SocketClient.segmentWrite(bytes, outputStream); socket.shutdownOutput(); //關閉流 SocketClient.close(socket,outputStream,isr,br,is); log.info("SocketService 服務端任務執行完成"); } catch (IOException e) { log.error("SocketService IOException", e); } catch (Exception e) { log.error("SocketService Exception", e); } finally { try { SocketClient.close(socket,outputStream,isr,br,is); } catch (IOException e) { log.error("SocketService IOException", e); }
} } }
```
### 4 測試
測試的時候,我們必須先啟動服務端,然后再啟動客戶端,首先我們啟動服務端,打印日志如下:

接著我們啟動客戶端,打印日志如下:

我們最后看一下服務端的運行日志:

從以上運行結果中,我們可以看出得出的結果是符合我們預期的,服務端在請求高峰時,能夠并發處理5個請求,其余請求可以用正確的提示進行拒絕。
- 前言
- 第1章 基礎
- 01 開篇詞:為什么學習本專欄
- 02 String、Long 源碼解析和面試題
- 03 Java 常用關鍵字理解
- 04 Arrays、Collections、Objects 常用方法源碼解析
- 第2章 集合
- 05 ArrayList 源碼解析和設計思路
- 06 LinkedList 源碼解析
- 07 List 源碼會問哪些面試題
- 08 HashMap 源碼解析
- 09 TreeMap 和 LinkedHashMap 核心源碼解析
- 10 Map源碼會問哪些面試題
- 11 HashSet、TreeSet 源碼解析
- 12 彰顯細節:看集合源碼對我們實際工作的幫助和應用
- 13 差異對比:集合在 Java 7 和 8 有何不同和改進
- 14 簡化工作:Guava Lists Maps 實際工作運用和源碼
- 第3章 并發集合類
- 15 CopyOnWriteArrayList 源碼解析和設計思路
- 16 ConcurrentHashMap 源碼解析和設計思路
- 17 并發 List、Map源碼面試題
- 18 場景集合:并發 List、Map的應用場景
- 第4章 隊列
- 19 LinkedBlockingQueue 源碼解析
- 20 SynchronousQueue 源碼解析
- 21 DelayQueue 源碼解析
- 22 ArrayBlockingQueue 源碼解析
- 23 隊列在源碼方面的面試題
- 24 舉一反三:隊列在 Java 其它源碼中的應用
- 25 整體設計:隊列設計思想、工作中使用場景
- 26 驚嘆面試官:由淺入深手寫隊列
- 第5章 線程
- 27 Thread 源碼解析
- 28 Future、ExecutorService 源碼解析
- 29 押寶線程源碼面試題
- 第6章 鎖
- 30 AbstractQueuedSynchronizer 源碼解析(上)
- 31 AbstractQueuedSynchronizer 源碼解析(下)
- 32 ReentrantLock 源碼解析
- 33 CountDownLatch、Atomic 等其它源碼解析
- 34 只求問倒:連環相扣系列鎖面試題
- 35 經驗總結:各種鎖在工作中使用場景和細節
- 36 從容不迫:重寫鎖的設計結構和細節
- 第7章 線程池
- 37 ThreadPoolExecutor 源碼解析
- 38 線程池源碼面試題
- 39 經驗總結:不同場景,如何使用線程池
- 40 打動面試官:線程池流程編排中的運用實戰
- 第8章 Lambda 流
- 41 突破難點:如何看 Lambda 源碼
- 42 常用的 Lambda 表達式使用場景解析和應用
- 第9章 其他
- 43 ThreadLocal 源碼解析
- 44 場景實戰:ThreadLocal 在上下文傳值場景下的實踐
- 45 Socket 源碼及面試題
- 46 ServerSocket 源碼及面試題
- 47 工作實戰:Socket 結合線程池的使用
- 第10章 專欄總結
- 48 一起看過的 Java 源碼和面試真題