<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                # Fork/Join 框架教程:`ForkJoinPool`示例 > 原文: [https://howtodoinjava.com/java7/forkjoin-framework-tutorial-forkjoinpool-example/](https://howtodoinjava.com/java7/forkjoin-framework-tutorial-forkjoinpool-example/) 在 Java 程序中有效使用并行內核一直是一個挑戰。 很少有本地框架可以將工作分配到多個核心,然后將它們加入以返回結果集。 Java 7 已將此特性作為 [**Fork 和 Join 框架**](https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html "forkjoin")合并。 基本上, **Fork-Join 將手頭的任務分解為多個微型任務**,直到微型任務足夠簡單,可以解決而無需進一步分解。 **就像[分而治之算法](https://en.wikipedia.org/wiki/Divide_and_conquer_algorithms "Divide_and_conquer_algorithms")**一樣。 在此框架中要注意的一個重要概念是**,理想情況下,沒有工作線程處于空閑狀態**。 他們實現了**工作竊取算法**,因為空閑工作器`steal`從忙碌的工作器那里進行工作。 ![Fork Join Framework](https://img.kancloud.cn/7f/36/7f3647dbc62233ed996fc5c709bffa14_555x179.png) ForkJoin 框架 它基于 Java 并發性思想領袖 Doug Lea 的工作。 Fork/Join 處理線程的麻煩; 您只需要向框架指出可以分解并遞歸處理的部分。 它采用偽代碼(摘自 Doug Lea 關于該主題的論文): ```java Result solve(Problem problem) { if (problem is small) directly solve problem else { split problem into independent parts fork new subtasks to solve each part join all subtasks compose result from subresults } } ``` ```java Discussion Points 1) Core Classes used in Fork/Join Framework i) ForkJoinPool ii) ForkJoinTask 2) Example Implementations of Fork/Join Pool Framework i) Implementation Sourcecode ii) How it works? 3) Difference between Fork/Join Framework And ExecutorService 4) Existing Implementations in JDK 5) Conclusion ``` ## Fork/Join 框架中使用的核心類 支持 Fork-Join 機制的核心類是[`ForkJoinPool`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html "ForkJoinPool")和[`ForkJoinTask`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinTask.html "ForkJoinTask")。 讓我們詳細了解他們的角色。 ## `ForkJoinPool` `ForkJoinPool`基本上是`ExecutorService`的一種特殊實現,用于實現我們上面討論的竊取算法。 我們通過提供目標并行度,即**處理器**的數量,來創建`ForkJoinPool`的實例,如下所示: ```java ForkJoinPool pool = new ForkJoinPool(numberOfProcessors); Where numberOfProcessors = Runtime.getRunTime().availableProcessors(); ``` 如果使用無參數構造函數,則默認情況下,它將創建一個大小等于使用上述技術獲得的可用處理器數量的大小的池。 盡管您指定了任何初始池大小,但**池會動態調整其大小,以嘗試在任何給定時間點維護足夠的活動線程**。 與其他`ExecutorService`相比,另一個重要區別是該程序池無需在程序退出時顯式關閉,因為其所有線程均處于守護程序模式。 向`ForkJoinPool`提交任務的方式有三種。 **1)`execute()`方法**:所需的異步執行; 調用其`fork`方法在多個線程之間分配工作。 **2)`invoke()`方法**:等待獲得結果; 在池上調用`invoke`方法。 **3)`Submit()`方法**:返回一個`Future`對象,可用于檢查狀態并在完成時獲取結果。 ## `ForkJoinTask` 這是用于創建在`ForkJoinPool`中運行的任務的抽象類。 `Recursiveaction`和`RecursiveTask`是`ForkJoinTask`的僅有的兩個直接的已知子類。 這兩類之間的唯一區別是`RecursiveAction`不返回值,而`RecursiveTask`確實具有返回值并返回指定類型的對象。 > 在這兩種情況下,您都需要在子類中實現`compute`方法,該方法執行任務所需的主要計算。 `ForkJoinTask`類提供了幾種檢查任務執行狀態的方法。 如果任務以任何方式完成,則`isDone()`方法返回`true`。 如果任務未取消就完成或沒有遇到異常,則`isCompletedNormally()`方法返回`true`;如果任務被取消,則`isCancelled()`返回`true`。 最后,如果任務被取消或遇到異常,則`isCompletedabnormally()`返回`true`。 ## `ForkJoinPool`框架的示例實現 在此示例中,您將學習如何使用`ForkJoinPool`和`ForkJoinTask`類提供的異步方法來管理任務。 您將實現**程序,該程序將在文件夾及其子文件夾**中搜索具有確定擴展名的文件。 您將要實現的`ForkJoinTask`類將處理文件夾的內容。 對于該文件夾中的每個子文件夾,它將以異步方式將新任務發送到`ForkJoinPool`類。 對于該文件夾中的每個文件,任務將檢查文件的擴展名并將其繼續添加到結果列表中。 上述問題的解決方案在`FolderProcessor`類中實現,如下所示: ## 實現的源代碼 **`FolderProcessor.java`** ```java package forkJoinDemoAsyncExample; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveTask; public class FolderProcessor extends RecursiveTask<List<String>> { private static final long serialVersionUID = 1L; //This attribute will store the full path of the folder this task is going to process. private final String path; //This attribute will store the name of the extension of the files this task is going to look for. private final String extension; //Implement the constructor of the class to initialize its attributes public FolderProcessor(String path, String extension) { this.path = path; this.extension = extension; } //Implement the compute() method. As you parameterized the RecursiveTask class with the List<String> type, //this method has to return an object of that type. @Override protected List<String> compute() { //List to store the names of the files stored in the folder. List<String> list = new ArrayList<String>(); //FolderProcessor tasks to store the subtasks that are going to process the subfolders stored in the folder List<FolderProcessor> tasks = new ArrayList<FolderProcessor>(); //Get the content of the folder. File file = new File(path); File content[] = file.listFiles(); //For each element in the folder, if there is a subfolder, create a new FolderProcessor object //and execute it asynchronously using the fork() method. if (content != null) { for (int i = 0; i < content.length; i++) { if (content[i].isDirectory()) { FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension); task.fork(); tasks.add(task); } //Otherwise, compare the extension of the file with the extension you are looking for using the checkFile() method //and, if they are equal, store the full path of the file in the list of strings declared earlier. else { if (checkFile(content[i].getName())) { list.add(content[i].getAbsolutePath()); } } } } //If the list of the FolderProcessor subtasks has more than 50 elements, //write a message to the console to indicate this circumstance. if (tasks.size() > 50) { System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size()); } //add to the list of files the results returned by the subtasks launched by this task. addResultsFromTasks(list, tasks); //Return the list of strings return list; } //For each task stored in the list of tasks, call the join() method that will wait for its finalization and then will return the result of the task. //Add that result to the list of strings using the addAll() method. private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) { for (FolderProcessor item : tasks) { list.addAll(item.join()); } } //This method compares if the name of a file passed as a parameter ends with the extension you are looking for. private boolean checkFile(String name) { return name.endsWith(extension); } } ``` 并在`FolderProcessor`以上使用,請遵循以下代碼: **`Main.java`** ```java package forkJoinDemoAsyncExample; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { //Create ForkJoinPool using the default constructor. ForkJoinPool pool = new ForkJoinPool(); //Create three FolderProcessor tasks. Initialize each one with a different folder path. FolderProcessor system = new FolderProcessor("C:\\Windows", "log"); FolderProcessor apps = new FolderProcessor("C:\\Program Files", "log"); FolderProcessor documents = new FolderProcessor("C:\\Documents And Settings", "log"); //Execute the three tasks in the pool using the execute() method. pool.execute(system); pool.execute(apps); pool.execute(documents); //Write to the console information about the status of the pool every second //until the three tasks have finished their execution. do { System.out.printf("******************************************\n"); System.out.printf("Main: Parallelism: %d\n", pool.getParallelism()); System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount()); System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount()); System.out.printf("Main: Steal Count: %d\n", pool.getStealCount()); System.out.printf("******************************************\n"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } while ((!system.isDone()) || (!apps.isDone()) || (!documents.isDone())); //Shut down ForkJoinPool using the shutdown() method. pool.shutdown(); //Write the number of results generated by each task to the console. List<String> results; results = system.join(); System.out.printf("System: %d files found.\n", results.size()); results = apps.join(); System.out.printf("Apps: %d files found.\n", results.size()); results = documents.join(); System.out.printf("Documents: %d files found.\n", results.size()); } } ``` 上面程序的輸出將如下所示: ```java Main: Parallelism: 2 Main: Active Threads: 3 Main: Task Count: 1403 Main: Steal Count: 5551 ****************************************** ****************************************** Main: Parallelism: 2 Main: Active Threads: 3 Main: Task Count: 586 Main: Steal Count: 5551 ****************************************** System: 337 files found. Apps: 10 files found. Documents: 0 files found. ``` ## 怎么運行的? 在`FolderProcessor`類中,每個任務都處理文件夾的內容。 如您所知,此內容包含以下兩種元素: * 檔案 * 其他文件夾 如果任務找到文件夾,它將創建另一個`Task`對象來處理該文件夾,并使用`fork()`方法將其發送到池中。 如果該任務具有空閑的工作線程或可以創建新的工作線程,則此方法會將任務發送到執行該任務的池。 **方法將立即返回,因此任務可以繼續處理**文件夾的內容。 對于每個文件,任務都會將其擴展名與要查找的擴展名進行比較,如果它們相等,則將文件名添加到結果列表中。 任務處理完分配的文件夾的所有內容后,它將等待使用`join()`方法完成發送給池的所有任務的完成。 在任務中調用的此方法等待其執行完成,并返回`compute()`方法返回的值。 該任務將其發送的所有任務的結果與自己的結果分組,并將該列表作為`compute()`方法的返回值返回。 ## Fork/Join 框架和`ExecutorService`之間的區別 Fork/Join 和`Executor`框架之間的**主要區別是工作竊取算法**。 與`Executor`框架不同,當任務正在等待使用 join 操作創建的子任務完成時,正在執行該任務的線程(稱為工作器線程)將尋找尚未執行的其他任務并開始執行它。 通過這種方式,線程可以充分利用其運行時間,從而提高了應用程序的性能。 ## JDK 中的現有實現 Java SE 中有一些通常有用的特性,已經使用 Fork/Join 框架實現了。 1)Java SE 8 中引入的一種此類實現由`java.util.Arrays`類用于其`parallelSort()`方法。 這些方法類似于`sort()`,但是通過 Fork/Join 框架利用并發性。 在多處理器系統上運行時,大型數組的并行排序比順序排序要快。 2)在`Stream.parallel()`中使用的并行性。 閱讀有關 Java 8 中此[并行流操作的更多信息](//howtodoinjava.com/java8/java-8-tutorial-streams-by-examples/ "parallel strems")。 ## 總結 設計好的多線程算法很困難,并且**Fork/Join并非在每種情況下都有效**。 它在其自身的適用范圍內非常有用,但是最后,您必須確定您的問題是否適合該框架,否則,**您必須準備在`java.util.concurrent`包提供的一流工具基礎上開發自己的解決方案**。 **參考** [http://gee.cs.oswego.edu/dl/papers/fj.pdf](http://gee.cs.oswego.edu/dl/papers/fj.pdf) [http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html](https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html) [http://www.packtpub.com/java-7-concurrency-cookbook/book](https://www.packtpub.com/java-7-concurrency-cookbook/book) **祝您學習愉快!**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看