## **并發隊列**
* 非阻塞式隊列:ConcurrentLinkedQueue
* 阻塞式隊列:BlockingQueue
## **非阻塞式隊列**
ConcurrentLinkedQueue:是一個適用于高并發場景下的隊列,通過無鎖的方式,實現了高并發狀態下的高性能,它是一個基于鏈接節點的**無界**線程安全隊列。該隊列的元素遵循**先進先出**的原則,該隊列不允許null元素
```
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
queue.offer("A");
queue.offer("B");
queue.offer("C");
//從頭獲取元素,刪除該元素
System.out.println(queue.poll());
//從頭獲取元素,不刪除該元素
System.out.println(queue.peek());
//獲取總長度
System.out.println(queue.size());
```
## **阻塞式隊列**
BlockingQueue:隊列容器滿時生產者線程阻塞等待、隊列容器空時消費者線程阻塞等待
常用的實現類
* ArrayBlockingQueue:有邊界的阻塞隊列,需初始化容量大小,內部實現是一個數組
```
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
```
* LinkedBlockingQueue:初始化容量則為有邊界阻塞隊列、否則就是無邊界隊列(默認Integer.MAX\_VALUE)的容量,內部實現是一個鏈表
```
LinkedBlockingQueue queue = new LinkedBlockingQueue(3);
```
* SynchronousQueue:隊列內部僅允許容納一個元素
## **BlockingQueue模擬生產者與消費者**
1. 生產者
```
class Producer implements Runnable {
private BlockingQueue<String> blockingQueue;
private AtomicInteger count = new AtomicInteger();
private volatile boolean flag = true;
public Producer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (flag) {
String data = count.incrementAndGet() + "";
try {
boolean success = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (success) {
System.out.println("生產成功 data:" + data);
} else {
System.out.println("生產失敗 data:" + data);
}
Thread.sleep(1000);
} catch (Exception e) {
}
}
}
public void stop() {
this.flag = false;
}
}
```
2.消費者
```
class Consumer implements Runnable {
private volatile boolean flag = true;
private BlockingQueue<String> blockingQueue;
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (flag) {
try {
String data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null) {
flag= false;
System.out.println("消費者超過2秒時間未獲取到消息.");
return;
}
System.out.println("消費成功 data:" + data);
} catch (Exception e) {
}
}
}
}
```
3.測試
```
class test {
public static void main(String[] args) {
LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Thread p = new Thread(producer);
Thread c = new Thread(consumer);
p.start();
c.start();
try {
Thread.sleep(1000 * 5);
producer.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
運行結果
```
生產成功 data:1
消費成功 data:1
生產成功 data:2
消費成功 data:2
生產成功 data:3
消費成功 data:3
生產成功 data:4
消費成功 data:4
生產成功 data:5
消費成功 data:5
消費者超過2秒時間未獲取到消息.
```