[TOC]
## BlockingQueue
### 核心方法
#### 放入數據
(1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法
?的線程); ?
? ? ? (2)offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
(3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續.
#### 獲取數據
(1)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null;
(2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間
超時還沒有數據可取,返回失敗。
(3)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;?
(4)drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
## ArrayBlockingQueue
基于數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對于GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
~~~
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//獲得鎖
try {
while (count == items.length)
notFull.await();//隊伍滿了 暫時交出鎖
enqueue(e);
} finally {
lock.unlock();//釋放鎖
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//獲得鎖
try {
while (count == 0)
notEmpty.await();//隊伍空了 暫時交出鎖
return dequeue();
} finally {
lock.unlock();//獲得鎖
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();//喚醒take
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//喚醒put
return x;
}
~~~
## LinkedBlockingQueue
基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX\_VALUE),這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。
~~~
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final int capacity;//容量 默認為Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
}
~~~
## SynchronousQueue
SynchronousQueue(后面稱SQ)內部沒有容量,所以不能通過peek方法獲取頭部元素;也不能單獨插入元素,可以簡單理解為它**的插入和移除是“一對”對稱的操作**。為了兼容 Collection 的某些操作(例如contains),SQ 扮演了一個空集合的角色。
**當生產者插入后如果沒有消費者取出,就生產者這就會一直堵塞,直到消費者取出**
SQ 的一個典型應用場景是在線程池中,Executors.newCachedThreadPool() 就使用了它,這個構造使線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
SynchronousQueue是一個隊列和棧算法實現,在SynchronousQueue中雙隊列FIFO提供公平模式,而雙棧LIFO提供的則是非公平模式。
~~~
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("put thread start");
try {
queue.put(1);
} catch (InterruptedException e) {
}
System.out.println("put thread end");
}
});
Thread takeThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("take thread start");
try {
System.out.println("take from putThread: " + queue.take());
} catch (InterruptedException e) {
}
System.out.println("take thread end");
}
});
putThread.start();
Thread.sleep(1000);
takeThread.start();
}
~~~
一種輸出結果如下:
~~~
put thread start
take thread start
take from putThread: 1
put thread end
take thread end
~~~
從結果可以看出,put線程執行queue.put(1) 后就被阻塞了,只有take線程進行了消費,put線程才可以返回。可以認為這是一種線程與線程間一對一傳遞消息的模型。
## PriorityBlockingQueue?
和ArrayBlockingQueue一樣內部使用數組實現,插入和獲取數據使用同一把鎖。不同的是,不管有無指定長度,都會實現可以實現自動擴容;在構造函數需傳入comparator,用于插入元素時繼續排序,若沒有傳入comparator,則插入的元素必須實現Comparatable接口。
阻塞隊列中的優先級排序是基于一個**堆排序**
~~~
private transient Object[] queue;
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
//擴容
tryGrow(array, cap);
try {
//插入并排序
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
~~~
## DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,**在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。**
DelayQueue非常有用,可以運用在以下兩個應用場景:?
* 緩存系統的設計:使用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,就表示有緩存到期了。?
* 定時任務調度:使用DelayQueue保存當天要執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,比如Timer就是使用DelayQueue實現的。
~~~
//入隊操作與PriorityBlockingQueue基本一致,這里不再敘述,需要根據延時排序
public boolean offer(E e) {
// 獲取全局獨占鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 向優先隊列中插入元素
q.offer(e);
// 如果隊首元素是剛插入的元素,則設置leader為null,并喚醒阻塞在available上的線程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
// 釋放全局獨占鎖
lock.unlock();
}
}
~~~
~~~
public E take() throws InterruptedException {
// 獲取全局獨占鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 獲取隊首元素
E first = q.peek();
// 隊首為空,則阻塞當前線程
if (first == null)
available.await();
else {
// 獲取隊首元素的超時時間
long delay = first.getDelay(NANOSECONDS);
// 已超時,直接出隊
if (delay <= 0)
return q.poll();
// 釋放first的引用,避免內存泄漏
first = null; // don't retain ref while waiting
// leader != null表明有其他線程在操作,阻塞當前線程
if (leader != null)
available.await();
else {
// leader指向當前線程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超時阻塞
available.awaitNanos(delay);
} finally {
// 釋放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader為null并且隊列不為空,說明沒有其他線程在等待,那就通知條件隊列
if (leader == null && q.peek() != null)
available.signal();
// 釋放全局獨占鎖
lock.unlock();
}
~~~
- Java
- Object
- 內部類
- 異常
- 注解
- 反射
- 靜態代理與動態代理
- 泛型
- 繼承
- JVM
- ClassLoader
- String
- 數據結構
- Java集合類
- ArrayList
- LinkedList
- HashSet
- TreeSet
- HashMap
- TreeMap
- HashTable
- 并發集合類
- Collections
- CopyOnWriteArrayList
- ConcurrentHashMap
- Android集合類
- SparseArray
- ArrayMap
- 算法
- 排序
- 常用算法
- LeetCode
- 二叉樹遍歷
- 劍指
- 數據結構、算法和數據操作
- 高質量的代碼
- 解決問題的思路
- 優化時間和空間效率
- 面試中的各項能力
- 算法心得
- 并發
- Thread
- 鎖
- java內存模型
- CAS
- 原子類Atomic
- volatile
- synchronized
- Object.wait-notify
- Lock
- Lock之AQS
- Lock子類
- 鎖小結
- 堵塞隊列
- 生產者消費者模型
- 線程池