[TOC]
# BlockingQueue簡介
消息隊列常用于有生產者和消費者兩類角色的多線程同步場景
BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。
主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。
**插入:**
1. add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則拋出異常
2. offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.
3. put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續.
**讀取:**
4. poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null
5. take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止
**其他**
`int remainingCapacity();`返回隊列剩余的容量,在隊列插入和獲取的時候,不要瞎搞,數據可能不準
`boolean remove(Object o); `從隊列移除元素,如果存在,即移除一個或者更多,隊列改變了返回true
`public boolean contains(Object o);` 查看隊列是否存在這個元素,存在返回true
`int drainTo(Collection<? super E> c); `傳入的集合中的元素,如果在隊列中存在,那么將隊列中的元素移動到集合中
`int drainTo(Collection<? super E> c, int maxElements);` 和上面方法的區別在于,制定了移動的數量
## 實現類
BlockingQueue有四個具體的實現類,常用的兩種實現類為:
1. ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。
2. LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,
默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,
put方法在隊列滿的時候會阻塞直到有隊列成員被消費,
take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
LinkedBlockingQueue和ArrayBlockingQueue區別:
LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大于ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低于ArrayBlockingQueue.
## 代碼
**隊列生產者**
~~~
package testThread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueueProducer implements Runnable{
private final BlockingQueue<String> queue;
Random random = new Random();
//生產者
public TestBlockingQueueProducer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(random.nextInt(10));
String task = Thread.currentThread().getName() + " made a product " + i;
System.out.println(task);
//阻塞方法
queue.put(task);
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
~~~
**隊列消費者**
~~~
package testThread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueueConsumer implements Runnable {
private final BlockingQueue<String> queue;
Random random = new Random();
//消費者
public TestBlockingQueueConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName() + " trying...");
//如果隊列為空會阻塞當前線程
String temp = queue.take();
//這個是不準的,又有人放又有人取
int remainingCapacity = queue.remainingCapacity();
System.out.println(Thread.currentThread().getName() + " get a job " + temp);
// System.out.println("隊列中的元素個數: "+ remainingCapacity);
} catch (Exception e) {
e.printStackTrace();
}
}
}
~~~
**測試**
~~~
package testThread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
// BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
// 不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE
// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);
TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);
for (int i = 0; i < 3; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(consumer, "Consumer" + (i + 1)).start();
}
// new Thread(producer, "Producer" + (5)).start();
}
}
~~~
## 成員介紹
### ArrayBlockingQueue
線程安全
一個由數組結構組成的有界阻塞隊列
基于數組實現的有界阻塞隊列,查找快,增刪慢
生產者和消費者用的是同一把鎖
消費的方式:FIFO
需求:想按照隊列順序去執行任務,還不想出現頻繁的GC現象
### LinkedBlockingQueue
線程安全
一個由鏈表結構組成的有界阻塞隊列
基于鏈表實現的阻塞隊列,鏈表是增刪快,定位慢
### DelayQueue
DelayQueue中的元素,只有指定的延遲時間到了,才能夠從隊列中獲取到該元素。
DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞
應用場景:
1. 客戶端長時間占用連接的問題,超過這個空閑時間了,可以移除的
2. 處理長時間不用的緩存;如果隊列里面的對象長時間不用,超過了空閑時間,就移除
3. 任務超時處理
### PriorityBlockingQueue
線程安全
一個支持優先級排序的無界阻塞隊列
PriorityBlockingQueue并不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者
不阻塞生產者
阻塞消費者
### SynchronousQueue
?一種無緩沖的等待隊列,來一個任務就執行這個任務,這期間不能太添加任何的任務。也就是不用阻塞了,其實對于少量任務而言,這種做法更高效
聲明一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理
### concurrentLinkedQueue
peek
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介