第8篇筆記在引入join使用場景的時候,有個信息采集功能的案例:有若干臺采集服務器,然后還有一臺主機,這臺主機需要等待這若干臺服務器信息采集完之后再做進一步的處理,一臺采集服務器就對應一個線程,如之前寫的代碼:
```java
public class ThreadJoin3 {
public static void main(String[] args) throws InterruptedException {
long startTimestamp = System.currentTimeMillis();
// 假設有三臺機器,開啟三個線程。
Thread m1 = new Thread(new CaptureRunnable("M1", 1_000L));
Thread m2 = new Thread(new CaptureRunnable("M2", 2_000L));
Thread m3 = new Thread(new CaptureRunnable("M3", 3_000L));
m1.start();
m2.start();
m3.start();
m1.join();
m2.join();
m3.join();
long endTimestamp = System.currentTimeMillis();
System.out.printf("Save data begin timestamp is %s, end timestamp is %s\n", startTimestamp, endTimestamp);
System.out.printf("Spend time is %s", endTimestamp - startTimestamp);
}
}
/**
* 采集服務器節點的任務。
*/
class CaptureRunnable implements Runnable {
// 機器節點的名稱
private String machineName;
// 采集花費時間
private long spendTime;
public CaptureRunnable(String machineName, long spendTime) {
this.machineName = machineName;
this.spendTime = spendTime;
}
@Override
public void run() {
// do the really capture data.
try {
Thread.sleep(spendTime);
System.out.printf(machineName + " completed data capture at timestamp [%s] and successful.\n", System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getResult() {
return machineName + " finish.";
}
}
```
為了獲取三個線程統一的結束時間,使用了join,這是三個線程三個服務器的情況,如果有成千上萬臺那怎么辦,不可能每臺服務器對應一個線程,因為線程有一個stackSize的上限,此時就需要用到線程同步來避免這個問題了,我們一步一步完善這個功能:
首先創建十個線程,這里采用流的方式來創建,如下:
```java
public class CaptureService {
public static void main(String[] args) {
Stream.of("M1", "M2", "M3", "M4", "M5", "M6", "M7", "M8", "M9", "M10")
.map(CaptureService::createCaptureService)
.forEach(t -> {
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Optional.of("All of capture work finished.").ifPresent(System.out::println);
}
public static Thread createCaptureService(String name) {
return new Thread(() -> {
//TODO
System.out.println(Thread.currentThread().getName());
}, name);
}
}
```
運行效果如下:

可以看到線程都跑起來了,但是這樣實際是有問題的,因為寫在foreach里是只等待當前線程執行完了而不是所有的線程執行完。

這里使用java8的Stream.forEach(),因為這個方法是一個teminal operation,這個后續再深入學習,先知道有這么個東西,運行效果如下:

接下來完善createCaptureService:
1. 我們在CaptureService增加一個MAX_WORKER來代表最大執行線程數。
```java
private static final int MAX_WORKER = 5;
```
2. 定義一個class,作為執行代碼鎖的控制器Control。然后放在一個鏈表里。
```java
private static final LinkedList<Control> CONTROLS = new LinkedList<>();
```
3. 如果鏈表的長度大于最大執行線程數,就wait();
```java
while (CONTROLS.size() > MAX_WORKER) {
try {
CONTROLS.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
```
4. 如果鏈表長度小于最大執行線程數,就代表鏈表里還有空間,那么就在鏈表里增加一個Control的對象。
```java
CONTROLS.addLast(new Control());
```
5. 然后寫采集的業務,采集業務執行后,先把鏈表里的第一個移除。也就是先進先出。同時notifyAll()wait的線程,這時候一個線程搶導鎖就會繼續執行上面的4步的操作,依次類推下去:
```java
synchronized (CONTROLS) {
Optional.of("The worker [" + Thread.currentThread().getName() + "] END capture data.").ifPresent(System.out::println);
CONTROLS.removeFirst();
CONTROLS.notifyAll();
}
```
整體代碼如下:
```java
/**
* @program: ThreadDemo
* @description: 數據采集功能:利用多個線程采集多臺服務器運行狀態信息。
* 當服務器數量較少時,可以采取一個線程采集一臺服務器;
* 但是服務器數量非常大時,將不可能采取這種方式。
* 可以開啟一定數量的線程采集完成后再采集其他服務器,即運行的線程始終保持著穩定數量。
* @author: hs96.cn@Gmail.com
* @create: 2020-09-08
*/
public class CaptureService {
private static final int MAX_WORKER = 5;
private static final LinkedList<Control> CONTROLS = new LinkedList<>();
public static void main(String[] args) {
List<Thread> worker = new ArrayList<>();
Stream.of("M1", "M2", "M3", "M4", "M5", "M6", "M7", "M8", "M9", "M10")
.map(CaptureService::createCaptureService)
.forEach(t -> {
t.start();
worker.add(t);
});
worker.stream().forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Optional.of("All of capture work finished.").ifPresent(System.out::println);
}
public static Thread createCaptureService(String name) {
return new Thread(() -> {
// Optional可以防止NPE空指針異常
Optional.of("The worker [" + Thread.currentThread().getName() + "] BEGIN capture data.").ifPresent(System.out::println);
synchronized (CONTROLS) {
while (CONTROLS.size() > MAX_WORKER) {
try {
CONTROLS.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CONTROLS.addLast(new Control());
}
Optional.of("The worker [" + Thread.currentThread().getName() + "] is WORKING...").ifPresent(System.out::println);
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (CONTROLS) {
Optional.of("The worker [" + Thread.currentThread().getName() + "] END capture data.").ifPresent(System.out::println);
CONTROLS.removeFirst();
CONTROLS.notifyAll();
}
}, name);
}
private static class Control{
}
}
```
運行結果如下:

- 微服務
- 服務器相關
- 操作系統
- 極客時間操作系統實戰筆記
- 01 程序的運行過程:從代碼到機器運行
- 02 幾行匯編幾行C:實現一個最簡單的內核
- 03 黑盒之中有什么:內核結構與設計
- Rust
- 入門:Rust開發一個簡單的web服務器
- Rust的引用和租借
- 函數與函數指針
- Rust中如何面向對象編程
- 構建單線程web服務器
- 在服務器中增加線程池提高吞吐
- Java
- 并發編程
- 并發基礎
- 1.創建并啟動線程
- 2.java線程生命周期以及start源碼剖析
- 3.采用多線程模擬銀行排隊叫號
- 4.Runnable接口存在的必要性
- 5.策略模式在Thread和Runnable中的應用分析
- 6.Daemon線程的創建以及使用場景分析
- 7.線程ID,優先級
- 8.Thread的join方法
- 9.Thread中斷Interrupt方法學習&采用優雅的方式結束線程生命周期
- 10.編寫ThreadService實現暴力結束線程
- 11.線程同步問題以及synchronized的引入
- 12.同步代碼塊以及同步方法之間的區別和關系
- 13.通過實驗分析This鎖和Class鎖的存在
- 14.多線程死鎖分析以及案例介紹
- 15.線程間通信快速入門,使用wait和notify進行線程間的數據通信
- 16.多Product多Consumer之間的通訊導致出現程序假死的原因分析
- 17.使用notifyAll完善多線程下的生產者消費者模型
- 18.wait和sleep的本質區別
- 19.完善數據采集程序
- 20.如何實現一個自己的顯式鎖Lock
- 21.addShutdownHook給你的程序注入鉤子
- 22.如何捕獲線程運行期間的異常
- 23.ThreadGroup API介紹
- 24.線程池原理與自定義線程池一
- 25.給線程池增加拒絕策略以及停止方法
- 26.給線程池增加自動擴充,閑時自動回收線程的功能
- JVM
- C&C++
- GDB調試工具筆記
- C&C++基礎
- 一個例子理解C語言數據類型的本質
- 字節順序-大小端模式
- Php
- Php源碼閱讀筆記
- Swoole相關
- Swoole基礎
- php的五種運行模式
- FPM模式的生命周期
- OSI網絡七層圖片速查
- IP/TCP/UPD/HTTP
- swoole源代碼編譯安裝
- 安全相關
- MySql
- Mysql基礎
- 1.事務與鎖
- 2.事務隔離級別與IO的關系
- 3.mysql鎖機制與結構
- 4.mysql結構與sql執行
- 5.mysql物理文件
- 6.mysql性能問題
- Docker&K8s
- Docker安裝java8
- Redis
- 分布式部署相關
- Redis的主從復制
- Redis的哨兵
- redis-Cluster分區方案&應用場景
- redis-Cluster哈希虛擬槽&簡單搭建
- redis-Cluster redis-trib.rb 搭建&原理
- redis-Cluster集群的伸縮調優
- 源碼閱讀筆記
- Mq
- ELK
- ElasticSearch
- Logstash
- Kibana
- 一些好玩的東西
- 一次折騰了幾天的大華攝像頭調試經歷
- 搬磚實用代碼
- python讀取excel拼接sql
- mysql大批量插入數據四種方法
- composer好用的鏡像源
- ab
- 環境搭建與配置
- face_recognition本地調試筆記
- 虛擬機配置靜態ip
- Centos7 Init Shell
- 發布自己的Composer包
- git推送一直失敗怎么辦
- Beyond Compare過期解決辦法
- 我的Navicat for Mysql
- 小錯誤解決辦法
- CLoin報錯CreateProcess error=216
- mysql error You must reset your password using ALTER USER statement before executing this statement.
- VM無法連接到虛擬機
- Jetbrains相關
- IntelliJ IDEA 筆記
- CLoin的配置與使用
- PhpStormDocker環境下配置Xdebug
- PhpStorm advanced metadata
- PhpStorm PHP_CodeSniffer