<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ## 21 DelayQueue 源碼解析 ## 引導語 之前我們說的阻塞隊列,都是資源足夠時立馬執行。本章我們說的隊列比較特殊,是一種延遲隊列,意思是延遲執行,并且可以設置延遲多久之后執行,比如設置過 5 秒鐘之后再執行,在一些延遲執行的場景被大量使用,比如說延遲對賬等等。 ### 1 整體設計 DelayQueue 延遲隊列底層使用的是鎖的能力,比如說要在當前時間往后延遲 5 秒執行,那么當前線程就會沉睡 5 秒,等 5 秒后線程被喚醒時,如果能獲取到資源的話,線程即可立馬執行。原理上似乎很簡單,但內部實現卻很復雜,有很多難點,比如當運行資源不夠,多個線程同時被喚醒時,如何排隊等待?比如說在何時阻塞?何時開始執行等等?接下來我們從源碼角度來看下是如何實現的。 #### 1.1 類注釋 類注釋上比較簡單,只說了三個概念: 1. 隊列中元素將在過期時被執行,越靠近隊頭,越早過期; 2. 未過期的元素不能夠被 take; 3. 不允許空元素。 這三個概念,其實就是三個問題,下文我們會一一看下這三點是如何實現的。 #### 1.2 類圖 DelayQueue 的類圖和之前的隊列一樣,不多說,關鍵是 DelayQueue 類上是有泛型的,如下: ``` public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { ``` 從泛型中可以看出,DelayQueue 中的元素必須是 Delayed 的子類,Delayed 是表達延遲能力的關鍵接口,其繼承了 Comparable 接口,并定義了還剩多久過期的方法,如下: ``` public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } ``` 也就是說 DelayQueue 隊列中的元素必須是實現 Delayed 接口和 Comparable 接口的,并覆寫了 getDelay 方法和 compareTo 的方法才行,不然在編譯時,編譯器就會提醒我們元素必須強制實現 Delayed 接口。 除此之外 DelayQueue 還大量使用了 PriorityQueue 隊列的大量功能,這個和 SynchronousQueue 隊列很像,大量復用了其它基礎類的邏輯,代碼示例如下: ![](https://img.kancloud.cn/7c/d7/7cd77d4024b03aaeb16cee087c5b0e8e_1365x1371.jpg) PriorityQueue 中文叫做優先級隊列,在此處的作用就是可以根據過期時間做優先級排序,讓先過期的可以先執行,用來實現類注釋中的第一點。 這里的復用的思想還是蠻重要的,我們在源碼中經常會遇到這種思想,比如說 LinkedHashMap 復用 HashMap 的能力,Set 復用 Map 的能力,還有此處的 DelayQueue 復用 PriorityQueue 的能力。小結一下,如果想要復用需要做到哪些: 1. 需要把能遇見可復用的功能盡量抽象,并開放出可擴展的地方,比如說 HashMap 在操作數組的方法中,都給 LinkedHashMap 開放出很多 after 開頭的方法,便于 LinkedHashMap 進行排序、刪除等等; 2. 采用組合或繼承兩種手段進行復用,比如 LinkedHashMap 采用的繼承、 Set 和 DelayQueue 采用的組合,組合的意思就是把可復用的類給依賴進來。 ### 2 演示 為了方便大家理解,寫了一個演示的 demo,演示了一下: ``` public class DelayQueueDemo { // 隊列消息的生產者 static class Product implements Runnable { private final BlockingQueue queue; public Product(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { log.info("begin put"); long beginTime = System.currentTimeMillis(); queue.put(new DelayedDTO(System.currentTimeMillis() + 2000L,beginTime));//延遲 2 秒執行 queue.put(new DelayedDTO(System.currentTimeMillis() + 5000L,beginTime));//延遲 5 秒執行 queue.put(new DelayedDTO(System.currentTimeMillis() + 1000L * 10,beginTime));//延遲 10 秒執行 log.info("end put"); } catch (InterruptedException e) { log.error("" + e); } } } // 隊列的消費者 static class Consumer implements Runnable { private final BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { log.info("Consumer begin"); ((DelayedDTO) queue.take()).run(); ((DelayedDTO) queue.take()).run(); ((DelayedDTO) queue.take()).run(); log.info("Consumer end"); } catch (InterruptedException e) { log.error("" + e); } } } @Data // 隊列元素,實現了 Delayed 接口 static class DelayedDTO implements Delayed { Long s; Long beginTime; public DelayedDTO(Long s,Long beginTime) { this.s = s; this.beginTime =beginTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(s - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } public void run(){ log.info("現在已經過了{}秒鐘",(System.currentTimeMillis() - beginTime)/1000); } } // demo 運行入口 public static void main(String[] args) throws InterruptedException { BlockingQueue q = new DelayQueue(); DelayQueueDemo.Product p = new DelayQueueDemo.Product(q); DelayQueueDemo.Consumer c = new DelayQueueDemo.Consumer(q); new Thread(c).start(); new Thread(p).start(); } } ``` 打印出來的結果如下: ``` 06:57:50.544 [Thread-0] Consumer begin 06:57:50.544 [Thread-1] begin put 06:57:50.551 [Thread-1] end put 06:57:52.554 [Thread-0] 延遲了2秒鐘才執行 06:57:55.555 [Thread-0] 延遲了5秒鐘才執行 06:58:00.555 [Thread-0] 延遲了10秒鐘才執行 06:58:00.556 [Thread-0] Consumer end ``` 寫這個代碼的目的主要想演示一下延遲執行的例子,我們大概的思路是: 1. 新建隊列的元素,如 DelayedDTO,必須實現 Delayed 接口,我們在 getDelay 方法中實現了現在離過期時間還剩多久的方法。 2. 定義隊列元素的生產者,和消費者,對應著代碼中的 Product 和 Consumer。 3. 對生產者和消費者就行初始化和管理,對應著我們的 main 方法。 雖然這只是一個簡單的 demo,但實際工作中,我們使用 DelayQueue 基本上就是這種思想,只不過寫代碼的時候會更加通用和周全,接下來我們來看下 DelayQueue 是如何實現 put 和 take 的。 ### 3 放數據 我們以 put 為例,put 調用的是 offer 的方法,offer 的源碼如下: ``` public boolean offer(E e) { final ReentrantLock lock = this.lock; // 上鎖 lock.lock(); try { // 使用 PriorityQueue 的擴容,排序等能力 q.offer(e); // 如果恰好剛放進去的元素正好在隊列頭 // 立馬喚醒 take 的阻塞線程,執行 take 操作 // 如果元素需要延遲執行的話,可以使其更快的沉睡計時 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { // 釋放鎖 lock.unlock(); } } ``` 可以看到其實底層使用到的是 PriorityQueue 的 offer 方法,我們來看下: ``` // 新增元素 public boolean offer(E e) { // 如果是空元素的話,拋異常 if (e == null) throw new NullPointerException(); modCount++; int i = size; // 隊列實際大小大于容量時,進行擴容 // 擴容策略是:如果老容量小于 64,2 倍擴容,如果大于 64,50 % 擴容 if (i >= queue.length) grow(i + 1); size = i + 1; // 如果隊列為空,當前元素正好處于隊頭 if (i == 0) queue[0] = e; else // 如果隊列不為空,需要根據優先級進行排序 siftUp(i, e); return true; } // 按照從小到大的順序排列 private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; // k 是當前隊列實際大小的位置 while (k > 0) { // 對 k 進行減倍 int parent = (k - 1) >>> 1; Object e = queue[parent]; // 如果 x 比 e 大,退出,把 x 放在 k 位置上 if (key.compareTo((E) e) >= 0) break; // x 比 e 小,繼續循環,直到找到 x 比隊列中元素大的位置 queue[k] = e; k = parent; } queue[k] = key; } ``` 可以看到,PriorityQueue 的 offer 方法主要做了三件事情: 1. 對新增元素進行判空; 2. 對隊列進行擴容,擴容策略和集合的擴容策略很相近; 3. 根據元素的 compareTo 方法進行排序,我們希望最終排序的結果是從小到大的,因為我們想讓隊頭的都是過期的數據,我們需要在 compareTo 方法里面實現:通過每個元素的過期時間進行排序,如下: ``` (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); ``` 這樣便可實現越快過期的元素越能排到隊頭。 可以看到,新增數據時,只是使用到了 compareTo 方法,來對隊列中的元素進行排序,接下來我們看下,取數據時,是如何操作的。 ### 4 拿數據 取數據時,如果發現有元素的過期時間到了,就能拿出數據來,如果沒有過期元素,那么線程就會一直阻塞,我們以 take 為例子,來看一下核心源碼: ``` for (;;) { // 從隊頭中拿數據出來 E first = q.peek(); // 如果為空,說明隊列中,沒有數據,阻塞住 if (first == null) available.await(); else { // 獲取隊頭數據的過期時間 long delay = first.getDelay(NANOSECONDS); // 如果過期了,直接返回隊頭數據 if (delay <= 0) return q.poll(); // 引用置為 null ,便于 gc,這樣可以讓線程等待時,回收 first 變量 first = null; // leader 不為空的話,表示當前隊列元素之前已經被設置過阻塞時間了 // 直接阻塞當前線程等待。 if (leader != null) available.await(); else { // 之前沒有設置過阻塞時間,按照一定的時間進行阻塞 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 進行阻塞 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } ``` 可以看到阻塞等待的功能底層使用的是鎖的能力,這個我們在后面章節中會說到。 以上演示的 take 方法是會無限阻塞,直到隊頭的過期時間到了才會返回,如果不想無限阻塞,可以嘗試 poll 方法,設置超時時間,在超時時間內,隊頭元素還沒有過期的話,就會返回 null。 ### 5 總結 DelayQueue 是非常有意思的隊列,底層使用了排序和超時阻塞實現了延遲隊列,排序使用的是 PriorityQueue 排序能力,超時阻塞使用得是鎖的等待能力,可以看出 DelayQueue 其實就是為了滿足延遲執行的場景,在已有 API 的基礎上進行了封裝,我們在工作中,可以學習這種思想,對已有的功能能復用的盡量復用,減少開發的工作量。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看