<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                作者 張超盟 [上一篇文章](http://www.infoq.com/cn/articles/executor-framework-thread-pool-task-execution-part-01)中通過引入的一個例子介紹了在Executor框架下,提交一個任務的過程,這個過程就像我們老大的老大要找個老大來執行一個任務那樣簡單。并通過剖析ExecutorService的一種經典實現ThreadPoolExecutor來分析接收任務的主要邏輯,發現ThreadPoolExecutor的工作思路和我們帶項目的老大的工作思路完全一致。在本文中我們將繼續后面的步驟,著重描述下任務執行的過程和任務執行結果獲取的過程。會很容易發現,這個過程我們更加熟悉,因為正是每天我們工作的過程。除了ThreadPoolExecutor的內部類Worker外,對執行內容和執行結果封裝的FutureTask的表現是這部分著重需要了解的。 為了連貫期間,內容的編號延續上篇。 ### 2. 任務執行 其實應該說是任務被執行,任務是賓語。動賓結構:execute the task,執行任務,無論寫成英文還是中文似乎都是這樣。那么主語是是who呢?明顯不是調用submit的那位(線程),那是哪位呢?上篇介紹ThreadPoolExecutor主要屬性時提到其中有一個HashSet workers的集合,我們有說明這里存儲的就是線程池的工作隊列的集合,隊列的對象是Worker類型的工作線程,是ThreadPoolExecutor的一個內部類,實現了Runnable接口: ~~~ private final class Worker implements Runnable ~~~ 8)? 看作業線程干什么當然是看它的run方法在干什么。如我們所料,作業線程就是在一直調用getTask方法獲取任務,然后調用 runTask(task)方法執行任務。看到沒有,是在while循環里面,就是不干完不罷休的意思!在加班干活的苦逼的朋友們,有沒有遇見戰友的親切感覺? ~~~ public void run() { try { Runnable task = firstTask; //循環從線程池的任務隊列獲取任務 while (task != null || (task = getTask()) != null) { //執行任務 runTask(task); task = null; } } finally { workerDone(this); } } ~~~ 然后簡單看下getTask和runTask(task)方法的內容。 9) getTask方法是ThreadPoolExecutor提供給其內部類Worker的的方法。作用就是一個,從任務隊列中取任務,源源不斷地輸出任務。有沒有想到老大手里拿的總是滿滿當當的project,也是源源不斷的。 ~~~ Runnable getTask() { for (;;) { //從任務隊列的頭部取任務 r = workQueue.take(); return r; } } ~~~ 10) runTask(Runnable task)是工作線程Worker真正處理拿到的每個具體任務。看到這里才可用確認我們的猜想,之前提到[[y1]](http://infoqhelp.sinaapp.com/architectgen#_msocom_1) 的“執行任務”這個動賓結構前面的主語正是這些Worker呀。嘮叨了半天(看主要方法都看到了整整第10個了),前面都是派活,這里才是干活。和我們的工作何其相似!老大(LD),老大的老大(LD^2),老大的老大(LD^n) 非常辛苦,花了很多時間、精力在會議室、在project上想著怎么生成和安排任務,然而真的輪到咱哥們干活,可能花了不少時間,但看看流程就是這么簡單。**三個大字:“****Just do it****”。** ~~~ private void runTask(Runnable task) { //調用任務的run方法,即在Worker線程中執行Task內定義內容。 task.run(); } ~~~ 需要注意的地方出現了,調用的其實是task的run方法。看下FutureTask的run方法做了什么事情。 這里插入一個FutureTask的類圖。可以看到FutureTask實現了RunnableFuture接口,所以FutureTask即有Runnable接口的run方法來定義任務內容,也有Future接口中定義的get、cancel等方法來控制任務執行和獲取執行結果。Runnable接口自不用說,Future接口的偉大設計,就是使得實現該接口的對象可以阻塞線程直到任務執行完畢,也可以取消任務執行,檢測任務是執行完畢還是被取消了。想想在之前我們使用Thread.join()或者Thread.join(long millis)等待任務結束是多么苦澀啊。 FutureTask內部定義了一個Sync的內部類,繼承自AQS,來維護任務狀態。關于AQS的設計思路,可以參照參考Doug Lea大師的原著[_The java_._util_._concurrent Synchronizer Framework_](http://gee.cs.oswego.edu/dl/papers/aqs.pdf)。 ![2015-08-04/55c030e76f19f](https://box.kancloud.cn/2015-08-04_55c030e76f19f.jpg) 11) 和其他的同步工具類一樣,FutureTask的主要工作內容也是委托給其定義的內部類Sync來完成。 ~~~ public void run() { //調用Sync的對應方法 sync.innerRun(); } ~~~ 12)?? FutureTask.Sync.innerRun(),這樣做的目的就是為了維護任務執行的狀態,只有當執行完后才能夠獲得任務執行結果。在該方法中,首先設置執行狀態為RUNNING只有判斷任務的狀態是運行狀態,才調用任務內封裝的回調,并且在執行完成后設置回調的返回值到FutureTask的result變量上。在FutureTask中,innerRun等每個“寫”方法都會首先修改狀態位,在后續會看到innerGet等“讀”方法會先判斷狀態,然后才能決定后續的操作是否可以繼續。下圖是FutureTask.Sync中幾個重要狀態的流轉情況,和其他的同步工具類一樣,狀態位使用的也是父類AQS的state屬性。 ![2015-08-04/55c030f1cc7e6](https://box.kancloud.cn/2015-08-04_55c030f1cc7e6.png) ~~~ void innerRun() { //通過對AQS的狀態位state的判斷來判斷任務的狀態是運行狀態,則調用任務內封裝的回調,并且設置回調的返回值 if (getState() == RUNNING) innerSet(callable.call()); } void innerSet(V v) { for (;;) { int s = getState(); //設置運行狀態為完成,并且把回調額執行結果設置給result變量 if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); return; } } ~~~ 至此工作線程執行Task就結束了。提交的任務是由Worker工作線程執行,正是在該線程上調用Task中定義的任務內容,即封裝的Callable回調,并設置執行結果。下面就是最重要的部分:調用者如何獲取執行的結果。讓你加班那么久,總得把成果交出來吧。老大在等,因為老大的老大在等! ### 3. 獲取執行結果 前面說過,對于老大的老大這樣的使用者來說,獲取執行結果這個過程總是最容易的事情,只需調用FutureTask的get()方法即可。該方法是在Future接口中就定義的。get方法的作用就是等待執行結果。(Waits if necessary for the computation to complete, and then retrieves its result.)Future這個接口命名得真好,雖然是在未來,但是定義有一個get()方法,總是“可以掌控的未來,總是有收獲的未來!”實現該接口的FutureTask也應該是這個意思,在未來要完成的任務,但是一樣要有結果哦。 13)? FutureTask的get方法同樣委托給Sync來執行。和該方法類似,還有一個V get(long timeout, TimeUnit unit),可以配置超時時間。 ~~~ public V get() throws InterruptedException, ExecutionException { return sync.innerGet(); } ~~~ 14)? 在Sync的 innerGet方法中,調用AQS父類定義的獲取共享鎖的方法acquireSharedInterruptibly來等待執行完成。如果執行完成了則可以繼續執行后面的代碼,返回result結果,否則如果還未完成,則阻塞線程等待執行完成。[[bd2]](http://infoqhelp.sinaapp.com/architectgen#_msocom_2) 再大的老大要想獲得結果也得等老子干完了才行!可以看到調用FutureTask的get方法,進而調用到該方法的一定是想要執行結果的線程,一般應該就是提交Task的線程,而這個任務的執行是在Worker的工作線程上,通過AQS來保證執行完畢才能獲取執行結果。該方法中acquireSharedInterruptibly是AQS父類中定義的獲取共享鎖的方法,但是到底滿足什么條件可以成功獲取共享鎖,這是Sync的tryAcquireShared方法內定義的。[[bd3]](http://infoqhelp.sinaapp.com/architectgen#_msocom_3) 具體說來,innerIsDone用來判斷是否執行完畢,如果執行完畢則向下執行,返回result即可;如果判斷未完成,則調用AQS的doAcquireSharedInterruptibly來掛起當前線程,一直到滿足條件。這種思路在其他的幾種同步工具類[Semaphore](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html)、[CountDownLatch](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html)、[ReentrantLock](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html)、[ReentrantReadWriteLock](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html)也廣泛使用。借助AQS框架,在獲取鎖時,先判斷當前狀態是否允許獲取鎖,若是允許則獲取鎖,否則獲取不成功。獲取不成功則會阻塞,進入阻塞隊列。而釋放鎖時,一般會修改狀態位,喚醒隊列中的阻塞線程。每個同步工具類的自定義同步器都繼承自AQS父類,是否可以獲取鎖根據同步類自身的功能要求覆蓋AQS對應的try前綴方法,這些方法在AQS父類中都是只有定義沒有內容。可以參照《[源碼剖析AQS在幾個同步工具類中的使用](http://www.idouba.net/sync-implementation-by-aqs/)》來詳細了解。 突然想到想想那些被稱為老大的,是不是整個career流程就是只干兩件事情:submit a task, then wait and get the result。不對,還有一件事情,不是等待,而是催。“完了沒,完了沒?schedule很緊的,抓點緊啊,要不要適當加點班啊……” ~~~ V innerGet() throws InterruptedException, ExecutionException { //獲得鎖,表示執行完畢,才能獲得后執行結果,否則阻塞等待執行完成再獲取執行結果 acquireSharedInterruptibly(0); return result; } protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } ~~~ 至此,獲得執行結果,圓滿完成任務! 老大的老大,拍著咱們老大的肩膀(或者深情的撫摸著咱們老大唏噓胡茬的臉龐)說:“親,你這活干的漂亮!”而隔壁桌座位的幾個兄弟,剛熬了幾個晚上加班交付完這波task后,發現任務隊列里又有新任務了,俺們老大又從他的另外一個老大手里接來的任務了。每個人都按照這樣的角色進行著,依照這樣的角色安排和諧愉快地進行著。。。 選擇合適的任務執行服務,如可以根據需要選擇ThreadPoolExecutor還是ScheduledThreadPoolExecutor,并定制ExecutorService的配置。 定義好任務的工作內容和結果類型,提交任務,等待任務的執行結果 | 角色名 | 任務用戶 | 任務管理者 | 任務執行者 | |--|---|---|---| | **角色屬性** | 任務的甲方 | 任務的乙方 | 乙方的工具 | | **角色說明** | 接收提交的任務;維護執行服務內部管理;配置工作線程執行任務 | 每個工作線程一直從任務執行服務獲取待執行的任務,保證任務完成后返回執行結果 | | | **Executor中對應** | 創建獲取ExecutorService、并提交Task的外部接口 | ExecutorService的各種實現。 | 執行服務內定義的配套的Worker線程。如ThreadPoolExecutor.Worker | | **主要接口方法** | submit(Callable task) | execute(Runnable command) | runTask(Runnable task) | | **現實角色映射** | 手里有活的大老大 | 領人干活的老大 | 真正干活的碼農 | | **主要工作偽代碼** | taskService = createService() future=taskService.submitTask() future.get() | executeTask() { addTask() createThread() } | while(ture) { getTask() runTask() } | ## 四、 總結 從時序圖上看主要的幾個角色是這樣配合完成任務提交、任務執行、獲取執行結果這幾個步驟的。 ![2015-08-04/55c0311d30d59](https://box.kancloud.cn/2015-08-04_55c0311d30d59.png) 1. 外面需要提交任務的角色(如例子中老大的老大),首先創建一個任務執行服務ExecutorService,一般使用工具類Executors的若干個工廠方法 創建不同特征的線程池ThreadPoolExecutor,例子中是使用newFixedThreadPool方法創建有n個固定工作線程的線程池。 2. 線程池是專門負責從外面接活的老大。把任務封裝成一個FutureTask對象,并根據輸入定義好要獲得結果的類型,就可以submit任務了。 3. 線程池就像我們團隊里管人管項目的老大,各個都有一套嫻熟、有效的辦法來對付輸入的任務和手下干活的兄弟一樣,內部有一套比較完整、細致的任務管理辦法,工作線程管理辦法,以便應付輸入的任務。這些邏輯全部在其execute方法中體現。 4. 線程池接收輸入的task,根據需要創建工作線程,啟動工作線程來執行task。 5. 工作線程在其run方法中一直循環,從線程池領取可以執行的task,調用task的run方法執行task內定義的任務。 6. FutureTask的run方法中調用其內部類Sync的innerRun方法來執行封裝的具體任務,并把任務的執行結果返回給FutureTask的result變量。 7. 當提及任務的角色調用FutureTask的get方法獲取執行結果時,Sync的innerGet方法被調用。根據任務的執行狀態判斷,任務執行完畢則返回執行結果;未執行完畢則等待。 還記得我們費了半天勁試圖找出任務執行時那個動賓結構的主語嗎?從示例上看更像是線程池在向外提供任務執行的服務。就像我們的老大在代表我們接收任務、執行任務、提交執行結果。明顯我們這些真正的Worker成了延伸,有點搞不懂到底我們是主語,還是主語延伸的工具,就像定義ThreadPoolExecutor的內部類Worker一樣。我們只是工具,不是主語,是狀語: execute the task by workers。突然想到毛主席當年的“數風流人物,還看今朝”,說的應該是這些Worker的勞苦大眾吧,怎么都今朝這么久了,俺們這些Woker們還是風流不起來呢?風騷的作者居然在上面嚴肅的時序圖上加了個風騷的小星星,向同行的Worker們致敬!
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看