<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## 19 LinkedBlockingQueue 源碼解析 ## 引導語 說到隊列,大家的反應可能是我從來都沒有用過,應該是不重要的 API 吧。如果這么想,那就大錯特錯了,我們平時使用到的線程池、讀寫鎖、消息隊列等等技術和框架,底層原理都是隊列,所以我們萬萬不可輕視隊列,隊列是很多高級 API 的基礎,學好隊列,對自己深入 Java 學習非常重要。 本文主要以 LinkedBlockingQueue 隊列為例,詳細描述一下底層具體的實現。 ### 1 整體架構 LinkedBlockingQueue 中文叫做鏈表阻塞隊列,這個命名很好,從命名上就知道其底層數據結構是鏈表,并且隊列是可阻塞的。接下來,我們就從整體結構上看看 LinkedBlockingQueue。 #### 1.1 類圖 首先我們來看下 LinkedBlockingQueue 類圖,如下: ![](https://img.kancloud.cn/e9/93/e993e449203a895b43ee3c633092e3f2_1074x736.jpg) 從類圖中,我們大概可以看出兩條路徑: 1. AbstractQueue -> AbstractCollection -> Collection ->Iterable 這條路徑依賴,主要是想復用 Collection 和 迭代器的一些操作,這些我們在說集合的時候,都知道這些類是干什么,能干什么,就不細說了; 2. BlockingQueue -> Queue -> Collection,BlockingQueue 和 Queue 是新出來的兩個接口,我們重點說一下。 Queue 是最基礎的接口,幾乎所有的隊列實現類都會實現這個接口,該接口定義出了隊列的三大類操作: 新增操作: 1. add 隊列滿的時候拋出異常; 2. offer 隊列滿的時候返回 false。 查看并刪除操作: 1. remove 隊列空的時候拋異常; 2. poll 隊列空的時候返回 null。 只查看不刪除操作: 1. element 隊列空的時候拋異常; 2. peek 隊列空的時候返回 null。 一共 6 種方法,除了以上分類方法,也可以分成兩類: 1. 遇到隊列滿或空的時候,拋異常,如 add、remove、element; 2. 遇到隊列滿或空的時候,返回特殊值,如 offer、poll、peek。 實際上,這些都比較難記憶。每次需要使用的時候,我都會看會源碼,才能想起這個方法是拋異常還是返回特殊值。 BlockingQueue 在 Queue 的基礎上加上了阻塞的概念,比如一直阻塞,還是阻塞一段時間。為了方便記憶,我們畫一個表格,如下: | | |特殊值|一直阻塞|阻塞一段時間| | --- | --- | --- | --- | --- | |新增操作–隊列滿|add|offer返回false|put|offer過超時時間返回false| |查看并刪除操作–隊列空 |remove|poll 返回 null|take|阻塞一段時間| |只查看不刪除操作–隊列空 |element|peek 返回 null|暫無|暫無| PS: remove 方法,BlockingQueue 類注釋中定義的是拋異常,但 LinkedBlockingQueue 中 remove 方法實際是返回 false。 從表格中可以看到,在新增和查看并刪除兩大類操作上,BlockingQueue 增加了阻塞的功能,而且可以選擇一直阻塞,或者阻塞一段時間后,返回特殊值。 #### 1.2 類注釋 我們看看從 LinkedBlockingQueue 的類注釋中能得到那些信息: 1. 基于鏈表的阻塞隊列,其底層的數據結構是鏈表; 2. 鏈表維護先入先出隊列,新元素被放在隊尾,獲取元素從隊頭部拿; 3. 鏈表大小在初始化的時候可以設置,默認是 Integer 的最大值; 4. 可以使用 Collection 和 Iterator 兩個接口的所有操作,因為實現了兩者的接口。 #### 1.3 內部構成 LinkedBlockingQueue 內部構成簡單來說,分成三個部分:鏈表存儲 + 鎖 + 迭代器,我們來看下源碼。 ``` // 鏈表結構 begin //鏈表的元素 static class Node<E> { E item; //當前元素的下一個,為空表示當前節點是最后一個 Node<E> next; Node(E x) { item = x; } } //鏈表的容量,默認 Integer.MAX_VALUE private final int capacity; //鏈表已有元素大小,使用 AtomicInteger,所以是線程安全的 private final AtomicInteger count = new AtomicInteger(); //鏈表頭 transient Node<E> head; //鏈表尾 private transient Node<E> last; // 鏈表結構 end // 鎖 begin //take 時的鎖 private final ReentrantLock takeLock = new ReentrantLock(); // take 的條件隊列,condition 可以簡單理解為基于 ASQ 同步機制建立的條件隊列 private final Condition notEmpty = takeLock.newCondition(); // put 時的鎖,設計兩把鎖的目的,主要為了 take 和 put 可以同時進行 private final ReentrantLock putLock = new ReentrantLock(); // put 的條件隊列 private final Condition notFull = putLock.newCondition(); // 鎖 end // 迭代器 // 實現了自己的迭代器 private class Itr implements Iterator<E> { ……………… } ``` 從代碼上來看,結構是非常清晰的,三種結構各司其職: 1. 鏈表的作用是為了保存當前節點,節點中的數據可以是任意東西,是一個泛型,比如說隊列被應用到線程池時,節點就是線程,比如隊列被應用到消息隊列中,節點就是消息,節點的含義主要看隊列被使用的場景; 2. 鎖有 take 鎖和 put 鎖,是為了保證隊列操作時的線程安全,設計兩種鎖,是為了 take 和 put 兩種操作可以同時進行,互不影響。 #### 1.4 初始化 初始化有三種方式: 1. 指定鏈表容量大小; 2. 不指定鏈表容量大小,默認是 Integer 的最大值; 3. 對已有集合數據進行初始化。 源碼如下: ``` // 不指定容量,默認 Integer 的最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 指定鏈表容量大小,鏈表頭尾相等,節點值(item)都是 null public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 已有集合數據進行初始化 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { // 集合內的元素不能為空 if (e == null) throw new NullPointerException(); // capacity 代表鏈表的大小,在這里是 Integer 的最大值 // 如果集合類的大小大于 Integer 的最大值,就會報錯 // 其實這個判斷完全可以放在 for 循環外面,這樣可以減少 Integer 的最大值次循環(最壞情況) if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } ``` 對于初始化源碼,我們說明兩點: 1. 初始化時,容量大小是不會影響性能的,只影響在后面的使用,因為初始化隊列太小,容易導致沒有放多少就會報隊列已滿的錯誤; 2. 在對給定集合數據進行初始化時,源碼給了一個不優雅的示范,我們不反對在每次 for 循環的時候,都去檢查當前鏈表的大小是否超過容量,但我們希望在 for 循環開始之前就做一步這樣的工作。舉個列子,給定集合大小是 1 w,鏈表大小是 9k,按照現在代碼實現,只能在 for 循環 9k 次時才能發現,原來給定集合的大小已經大于鏈表大小了,導致 9k 次循環都是在浪費資源,還不如在 for 循環之前就 check 一次,如果 1w > 9k,直接報錯即可。 ### 2 阻塞新增 新增有多種方法,如:add、put、offer,三者的區別上文有說。我們拿 put 方法為例,put 方法在碰到隊列滿的時候,會一直阻塞下去,直到隊列不滿時,并且自己被喚醒時,才會繼續去執行,源碼如下: ``` // 把e新增到隊列的尾部。 // 如果有可以新增的空間的話,直接新增成功,否則當前線程陷入等待 public void put(E e) throws InterruptedException { // e 為空,拋出異常 if (e == null) throw new NullPointerException(); // 預先設置 c 為 -1,約定負數為新增失敗 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 設置可中斷鎖 putLock.lockInterruptibly(); try { // 隊列滿了 // 當前線程阻塞,等待其他線程的喚醒(其他線程 take 成功后就會喚醒此處被阻塞的線程) while (count.get() == capacity) { // await 無限等待 notFull.await(); } // 隊列沒有滿,直接新增到隊列的尾部 enqueue(node); // 新增計數賦值,注意這里 getAndIncrement 返回的是舊值 // 這里的 c 是比真實的 count 小 1 的 c = count.getAndIncrement(); // 如果鏈表現在的大小 小于鏈表的容量,說明隊列未滿 // 可以嘗試喚醒一個 put 的等待線程 if (c + 1 < capacity) notFull.signal(); } finally { // 釋放鎖 putLock.unlock(); } // c==0,代表隊列里面有一個元素 // 會嘗試喚醒一個take的等待線程 if (c == 0) signalNotEmpty(); } // 入隊,把新元素放到隊尾 private void enqueue(Node<E> node) { last = last.next = node; } ``` 從源碼中我們可以總結以下幾點: 1. 往隊列新增數據,第一步是上鎖,所以新增數據是線程安全的; 2. 隊列新增數據,簡單的追加到鏈表的尾部即可; 3. 新增時,如果隊列滿了,當前線程是會被阻塞的,阻塞的底層使用是鎖的能力,底層實現其它也和隊列相關,原理我們在鎖章節會說到; 4. 新增數據成功后,在適當時機,會喚起 put 的等待線程(隊列不滿時),或者 take 的等待線程(隊列不為空時),這樣保證隊列一旦滿足 put 或者 take 條件時,立馬就能喚起阻塞線程,繼續運行,保證了喚起的時機不被浪費。 以上就是 put 方法的原理,至于 offer 方法阻塞超過一端時間后,仍未成功,就會直接返回默認值的實現,和 put 方法相比只修改了幾行代碼,如下截圖: ![](https://img.kancloud.cn/dc/64/dc649ee824669e00ae5f86673ee4cb54_2444x1372.jpg) ### 3 阻塞刪除 刪除的方法也很多,我們主要看兩個關鍵問題: 1. 刪除的原理是怎樣的; 2. 查看并刪除和只查看不刪除兩種的區別是如何實現的。 首先我們來看第一個問題,我們以 take 方法為例,說明一下查看并刪除的底層源碼: ``` // 阻塞拿數據 public E take() throws InterruptedException { E x; // 默認負數,代表失敗 int c = -1; // count 代表當前鏈表數據的真實大小 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 空隊列時,阻塞,等待其他線程喚醒 while (count.get() == 0) { notEmpty.await(); } // 非空隊列,從隊列的頭部拿一個出來 x = dequeue(); // 減一計算,注意 getAndDecrement 返回的值是舊值 // c 比真實的 count 大1 c = count.getAndDecrement(); // 如果隊列里面有值,從 take 的等待線程里面喚醒一個。 // 意思是隊列里面有值啦,喚醒之前被阻塞的線程 if (c > 1) notEmpty.signal(); } finally { // 釋放鎖 takeLock.unlock(); } // 如果隊列空閑還剩下一個,嘗試從 put 的等待線程中喚醒一個 if (c == capacity) signalNotFull(); return x; } // 隊頭中取數據 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null;// 頭節點指向 null,刪除 return x; } ``` 整體流程和 put 很相似,都是先上鎖,然后從隊列的頭部拿出數據,如果隊列為空,會一直阻塞到隊列有值為止。 而查看不刪除元素更加簡單,直接把隊列頭的數據拿出來即可,我們以 peek 為例,源碼如下: ``` // 查看并不刪除元素,如果隊列為空,返回 null public E peek() { // count 代表隊列實際大小,隊列為空,直接返回 null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 拿到隊列頭 Node<E> first = head.next; // 判斷隊列頭是否為空,并返回 if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } ``` 可以看出,查看并刪除,和查看不刪除兩者從隊頭拿數據的邏輯不太一致,從而導致一個會刪除,一個不會刪除隊頭數據。 ### 4 總結 本文通過 LinkedBlockingQueue 的源碼,來介紹了下鏈表隊列,當隊列滿和空的場景下,新增和刪除數據時,隊列有啥變化。 隊列本身就是一個阻塞工具,我們可以把這個工具應用到各種阻塞場景中,比如說隊列應用到線程池,當線程池跑滿時,我們把新的請求都放到阻塞隊列中等待;隊列應用到消息隊列,當消費者處理能力有限時,我們可以把消息放到隊列中等待,讓消費者慢慢消費;每應用到一個新的場景中,都是一個新的技術工具,所以學好隊列,用處很大。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看