<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 創建和運行任務 如果無法通過并行流實現并發,則必須創建并運行自己的任務。稍后你將看到運行任務的理想Java 8方法是CompletableFuture,但我們將使用更基本的工具介紹概念。 Java并發的歷史始于非常原始和有問題的機制,并且充滿了各種嘗試的改進。這些主要歸入附錄:[低級并發(Appendix: Low-Level Concurrency)](./Appendix-Low-Level-Concurrency.md)。在這里,我們將展示一個規范形式,表示創建和運行任務的最簡單,最好的方法。與并發中的所有內容一樣,存在各種變體,但這些變體要么降級到該附錄,要么超出本書的范圍。 - Tasks and Executors 在Java的早期版本中,你通過直接創建自己的Thread對象來使用線程,甚至將它們子類化以創建你自己的特定“任務線程”對象。你手動調用了構造函數并自己啟動了線程。 創建所有這些線程的開銷變得非常重要,現在不鼓勵采用手動操作方法。在Java 5中,添加了類來為你處理線程池。你可以將任務創建為單獨的類型,然后將其交給ExecutorService以運行該任務,而不是為每種不同類型的任務創建新的Thread子類型。ExecutorService為你管理線程,并且在運行任務后重新循環線程而不是丟棄線程。 首先,我們將創建一個幾乎不執行任務的任務。它“sleep”(暫停執行)100毫秒,顯示其標識符和正在執行任務的線程的名稱,然后完成: ```java // concurrent/NapTask.java import onjava.Nap; public class NapTask implements Runnable { final int id; public NapTask(int id) { this.id = id; } @Override public void run() { new Nap(0.1);// Seconds System.out.println(this + " "+ Thread.currentThread().getName()); } @Override public String toString() { return"NapTask[" + id + "]"; } } ``` 這只是一個**Runnable**:一個包含**run()**方法的類。它沒有包含實際運行任務的機制。我們使用**Nap**類中的“sleep”: ```java // onjava/Nap.java package onjava; import java.util.concurrent.*; public class Nap { public Nap(double t) { // Seconds try { TimeUnit.MILLISECONDS.sleep((int)(1000 * t)); } catch(InterruptedException e){ throw new RuntimeException(e); } } public Nap(double t, String msg) { this(t); System.out.println(msg); } } ``` 為了消除異常處理的視覺干擾,這被定義為實用程序。第二個構造函數在超時時顯示一條消息 對**TimeUnit.MILLISECONDS.sleep()**的調用獲取“當前線程”并在參數中將其置于休眠狀態,這意味著該線程被掛起。這并不意味著底層處理器停止。操作系統將其切換到其他任務,例如在你的計算機上運行另一個窗口。OS任務管理器定期檢查**sleep()**是否超時。當它執行時,線程被“喚醒”并給予更多處理時間。 你可以看到**sleep()**拋出一個受檢的**InterruptedException**;這是原始Java設計中的一個工件,它通過突然斷開它們來終止任務。因為它往往會產生不穩定的狀態,所以后來不鼓勵終止。但是,我們必須在需要或仍然發生終止的情況下捕獲異常。 要執行任務,我們將從最簡單的方法--SingleThreadExecutor開始: ```java //concurrent/SingleThreadExecutor.java import java.util.concurrent.*; import java.util.stream.*; import onjava.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); IntStream.range(0, 10) .mapToObj(NapTask::new) .forEach(exec::execute); System.out.println("All tasks submitted"); exec.shutdown(); while(!exec.isTerminated()) { System.out.println( Thread.currentThread().getName()+ " awaiting termination"); new Nap(0.1); } } } ``` 輸出結果: ``` All tasks submitted main awaiting termination main awaiting termination NapTask[0] pool-1-thread-1 main awaiting termination NapTask[1] pool-1-thread-1 main awaiting termination NapTask[2] pool-1-thread-1 main awaiting termination NapTask[3] pool-1-thread-1 main awaiting termination NapTask[4] pool-1-thread-1 main awaiting termination NapTask[5] pool-1-thread-1 main awaiting termination NapTask[6] pool-1-thread-1 main awaiting termination NapTask[7] pool-1-thread-1 main awaiting termination NapTask[8] pool-1-thread-1 main awaiting termination NapTask[9] pool-1-thread-1 ``` 首先請注意,沒有**SingleThreadExecutor**類。**newSingleThreadExecutor()**是**Executors**中的工廠,它創建特定類型的[^4] 我創建了十個NapTasks并將它們提交給ExecutorService,這意味著它們開始自己運行。然而,在此期間,main()繼續做事。當我運行callexec.shutdown()時,它告訴ExecutorService完成已經提交的任務,但不接受任何新任務。此時,這些任務仍然在運行,因此我們必須等到它們在退出main()之前完成。這是通過檢查exec.isTerminated()來實現的,這在所有任務完成后變為true。 請注意,main()中線程的名稱是main,并且只有一個其他線程pool-1-thread-1。此外,交錯輸出顯示兩個線程確實同時運行。 如果你只是調用exec.shutdown(),程序將完成所有任務。也就是說,不需要**while(!exec.isTerminated())**。 ```java // concurrent/SingleThreadExecutor2.java import java.util.concurrent.*; import java.util.stream.*; public class SingleThreadExecutor2 { public static void main(String[] args)throws InterruptedException { ExecutorService exec =Executors.newSingleThreadExecutor(); IntStream.range(0, 10) .mapToObj(NapTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 輸出結果: ``` NapTask[0] pool-1-thread-1 NapTask[1] pool-1-thread-1 NapTask[2] pool-1-thread-1 NapTask[3] pool-1-thread-1 NapTask[4] pool-1-thread-1 NapTask[5] pool-1-thread-1 NapTask[6] pool-1-thread-1 NapTask[7] pool-1-thread-1 NapTask[8] pool-1-thread-1 NapTask[9] pool-1-thread-1 ``` 一旦你callexec.shutdown(),嘗試提交新任務將拋出RejectedExecutionException。 ```java // concurrent/MoreTasksAfterShutdown.java import java.util.concurrent.*; public class MoreTasksAfterShutdown { public static void main(String[] args) { ExecutorService exec =Executors.newSingleThreadExecutor(); exec.execute(newNapTask(1)); exec.shutdown(); try { exec.execute(newNapTask(99)); } catch(RejectedExecutionException e) { System.out.println(e); } } } ``` 輸出結果: ``` java.util.concurrent.RejectedExecutionException: TaskNapTask[99] rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Shutting down, pool size = 1,active threads = 1, queued tasks = 0, completed tasks =0]NapTask[1] pool-1-thread-1 ``` **exec.shutdown()**的替代方法是**exec.shutdownNow()**,它除了不接受新任務外,還會嘗試通過中斷任務來停止任何當前正在運行的任務。同樣,中斷是錯誤的,容易出錯并且不鼓勵。 - 使用更多線程 使用線程的重點是(幾乎總是)更快地完成任務,那么我們為什么要限制自己使用SingleThreadExecutor呢?查看執行**Executors**的Javadoc,你將看到更多選項。例如CachedThreadPool: ```java // concurrent/CachedThreadPool.java import java.util.concurrent.*; import java.util.stream.*; public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec =Executors.newCachedThreadPool(); IntStream.range(0, 10) .mapToObj(NapTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 輸出結果: ``` NapTask[7] pool-1-thread-8 NapTask[4] pool-1-thread-5 NapTask[1] pool-1-thread-2 NapTask[3] pool-1-thread-4 NapTask[0] pool-1-thread-1 NapTask[8] pool-1-thread-9 NapTask[2] pool-1-thread-3 NapTask[9] pool-1-thread-10 NapTask[6] pool-1-thread-7 NapTask[5] pool-1-thread-6 ``` 當你運行這個程序時,你會發現它完成得更快。這是有道理的,每個任務都有自己的線程,所以它們都并行運行,而不是使用相同的線程來順序運行每個任務。這似乎沒毛病,很難理解為什么有人會使用SingleThreadExecutor。 要理解這個問題,我們需要一個更復雜的任務: ```java // concurrent/InterferingTask.java public class InterferingTask implements Runnable { final int id; private static Integer val = 0; public InterferingTask(int id) { this.id = id; } @Override public void run() { for(int i = 0; i < 100; i++) val++; System.out.println(id + " "+ Thread.currentThread().getName() + " " + val); } } ``` 每個任務增加val一百次。這似乎很簡單。讓我們用CachedThreadPool嘗試一下: ```java // concurrent/CachedThreadPool2.java import java.util.concurrent.*; import java.util.stream.*; public class CachedThreadPool2 { public static void main(String[] args) { ExecutorService exec =Executors.newCachedThreadPool(); IntStream.range(0, 10) .mapToObj(InterferingTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 輸出結果: ``` 0 pool-1-thread-1 200 1 pool-1-thread-2 200 4 pool-1-thread-5 300 5 pool-1-thread-6 400 8 pool-1-thread-9 500 9 pool-1-thread-10 600 2 pool-1-thread-3 700 7 pool-1-thread-8 800 3 pool-1-thread-4 900 6 pool-1-thread-7 1000 ``` 輸出不是我們所期望的,并且從一次運行到下一次運行會有所不同。問題是所有的任務都試圖寫入val的單個實例,并且他們正在踩著彼此的腳趾。我們稱這樣的類是線程不安全的。讓我們看看SingleThreadExecutor會發生什么: ```java // concurrent/SingleThreadExecutor3.java import java.util.concurrent.*; import java.util.stream.*; public class SingleThreadExecutor3 { public static void main(String[] args)throws InterruptedException { ExecutorService exec =Executors.newSingleThreadExecutor(); IntStream.range(0, 10) .mapToObj(InterferingTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 輸出結果: ``` 0 pool-1-thread-1 100 1 pool-1-thread-1 200 2 pool-1-thread-1 300 3 pool-1-thread-1 400 4 pool-1-thread-1 500 5 pool-1-thread-1 600 6 pool-1-thread-1 700 7 pool-1-thread-1 800 8 pool-1-thread-1 900 9 pool-1-thread-1 1000 ``` 現在我們每次都得到一致的結果,盡管**InterferingTask**缺乏線程安全性。這是SingleThreadExecutor的主要好處 - 因為它一次運行一個任務,這些任務不會相互干擾,因此強加了線程安全性。這種現象稱為線程封閉,因為在單線程上運行任務限制了它們的影響。線程封閉限制了加速,但可以節省很多困難的調試和重寫。 - 產生結果 因為**InterferingTask**是一個**Runnable**,它沒有返回值,因此只能使用副作用產生結果 - 操縱緩沖值而不是返回結果。副作用是并發編程中的主要問題之一,因為我們看到了**CachedThreadPool2.java**。**InterferingTask**中的**val**被稱為可變共享狀態,這就是問題所在:多個任務同時修改同一個變量會產生競爭。結果取決于首先在終點線上執行哪個任務,并修改變量(以及其他可能性的各種變化)。 避免競爭條件的最好方法是避免可變的共享狀態。我們可以稱之為自私的孩子原則:什么都不分享。 使用**InterferingTask**,最好刪除副作用并返回任務結果。為此,我們創建**Callable**而不是**Runnable**: ```java // concurrent/CountingTask.java import java.util.concurrent.*; public class CountingTask implements Callable<Integer> { final int id; public CountingTask(int id) { this.id = id; } @Override public Integer call() { Integer val = 0; for(int i = 0; i < 100; i++) val++; System.out.println(id + " " + Thread.currentThread().getName() + " " + val); return val; } } ``` **call()完全獨立于所有其他CountingTasks生成其結果**,這意味著沒有可變的共享狀態 **ExecutorService**允許你使用**invokeAll()**啟動集合中的每個Callable: ```java // concurrent/CachedThreadPool3.java import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class CachedThreadPool3 { public static Integer extractResult(Future<Integer> f) { try { return f.get(); } catch(Exception e) { throw new RuntimeException(e); } } public static void main(String[] args)throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); List<CountingTask> tasks = IntStream.range(0, 10) .mapToObj(CountingTask::new) .collect(Collectors.toList()); List<Future<Integer>> futures = exec.invokeAll(tasks); Integer sum = futures.stream() .map(CachedThreadPool3::extractResult) .reduce(0, Integer::sum); System.out.println("sum = " + sum); exec.shutdown(); } } ``` 輸出結果: ``` 1 pool-1-thread-2 100 0 pool-1-thread-1 100 4 pool-1-thread-5 100 5 pool-1-thread-6 100 8 pool-1-thread-9 100 9 pool-1-thread-10 100 2 pool-1-thread-3 100 3 pool-1-thread-4 100 6 pool-1-thread-7 100 7 pool-1-thread-8 100 sum = 1000 ``` 只有在所有任務完成后,**invokeAll()**才會返回一個**Future**列表,每個任務一個**Future**。**Future**是Java 5中引入的機制,允許你提交任務而無需等待它完成。在這里,我們使用**ExecutorService.submit()**: ```java // concurrent/Futures.java import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class Futures { public static void main(String[] args)throws InterruptedException, ExecutionException { ExecutorService exec =Executors.newSingleThreadExecutor(); Future<Integer> f = exec.submit(newCountingTask(99)); System.out.println(f.get()); // [1] exec.shutdown(); } } ``` 輸出結果: ``` 99 pool-1-thread-1 100 100 ``` - [1] 當你的任務在尚未完成的**Future**上調用**get()**時,調用會阻塞(等待)直到結果可用。 但這意味著,在**CachedThreadPool3.java**中,**Future**似乎是多余的,因為**invokeAll()**甚至在所有任務完成之前都不會返回。但是,這里的Future并不用于延遲結果,而是用于捕獲任何可能發生的異常。 還要注意在**CachedThreadPool3.java.get()**中拋出異常,因此**extractResult()**在Stream中執行此提取。 因為當你調用**get()**時,**Future**會阻塞,所以它只能解決等待任務完成才暴露問題。最終,**Futures**被認為是一種無效的解決方案,現在不鼓勵,我們推薦Java 8的**CompletableFuture**,這將在本章后面探討。當然,你仍會在遺留庫中遇到Futures。 我們可以使用并行Stream以更簡單,更優雅的方式解決這個問題: ```java // concurrent/CountingStream.java // {VisuallyInspectOutput} import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class CountingStream { public static void main(String[] args) { System.out.println( IntStream.range(0, 10) .parallel() .mapToObj(CountingTask::new) .map(ct -> ct.call()) .reduce(0, Integer::sum)); } } ``` 輸出結果: ``` 1 ForkJoinPool.commonPool-worker-3 100 8 ForkJoinPool.commonPool-worker-2 100 0 ForkJoinPool.commonPool-worker-6 100 2 ForkJoinPool.commonPool-worker-1 100 4 ForkJoinPool.commonPool-worker-5 100 9 ForkJoinPool.commonPool-worker-7 100 6 main 100 7 ForkJoinPool.commonPool-worker-4 100 5 ForkJoinPool.commonPool-worker-2 100 3 ForkJoinPool.commonPool-worker-3 100 1000 ``` 這不僅更容易理解,而且我們需要做的就是將 `parallel()` 插入到其他順序操作中,然后一切都在同時運行。 - Lambda和方法引用作為任務 在 **java8** 有了 **lambdas** 和方法引用,你不需要受限于只能使用 **Runnable** 和 **Callable** 。因為 java8 的**lambdas** 和方法引用可以通過匹配方法簽名來使用(即,它支持結構一致性),所以我們可以將非 **Runnable** 或 **Callable** 的參數傳遞給 `ExecutorService` : ```java // concurrent/LambdasAndMethodReferences.java import java.util.concurrent.*; class NotRunnable { public void go() { System.out.println("NotRunnable"); } } class NotCallable { public Integer get() { System.out.println("NotCallable"); return 1; } } public class LambdasAndMethodReferences { public static void main(String[] args)throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); exec.submit(() -> System.out.println("Lambda1")); exec.submit(new NotRunnable()::go); exec.submit(() -> { System.out.println("Lambda2"); return 1; }); exec.submit(new NotCallable()::get); exec.shutdown(); } } ``` 輸出結果: ``` Lambda1 NotCallable NotRunnable Lambda2 ``` 這里,前兩個**submit()**調用可以改為調用**execute()**。所有**submit()**調用都返回**Futures**,你可以在后兩次調用的情況下提取結果。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看