<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                <!-- Parallel Streams --> ## 并行流 Java 8流的一個顯著優點是,在某些情況下,它們可以很容易地并行化。這來自仔細的庫設計,特別是流使用內部迭代的方式 - 也就是說,它們控制著自己的迭代器。特別是,他們使用一種特殊的迭代器,稱為Spliterator,它被限制為易于自動分割。我們只需要念 `.parallel()` 就會產生魔法般的結果,流中的所有內容都作為一組并行任務運行。如果你的代碼是使用Streams編寫的,那么并行化以提高速度似乎是一種瑣事 例如,考慮來自Streams的Prime.java。查找質數可能是一個耗時的過程,我們可以看到該程序的計時: ```java // concurrent/ParallelPrime.java import java.util.*; import java.util.stream.*; import static java.util.stream.LongStream.*; import java.io.*; import java.nio.file.*; import onjava.Timer; public class ParallelPrime { static final int COUNT = 100_000; public static boolean isPrime(long n){ return rangeClosed(2, (long)Math.sqrt(n)).noneMatch(i -> n % i == 0); } public static void main(String[] args) throws IOException { Timer timer = new Timer(); List<String> primes = iterate(2, i -> i + 1) .parallel() // [1] .filter(ParallelPrime::isPrime) .limit(COUNT) .mapToObj(Long::toString) .collect(Collectors.toList()); System.out.println(timer.duration()); Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE); } } ``` 輸出結果: ``` Output: 1224 ``` 請注意,這不是微基準測試,因為我們計時整個程序。我們將數據保存在磁盤上以防止編譯器過激的優化;如果我們沒有對結果做任何事情,那么一個高級的編譯器可能會觀察到程序沒有意義并且終止了計算(這不太可能,但并非不可能)。請注意使用nio2庫編寫文件的簡單性(在[文件](./17-Files.md)一章中有描述)。 當我注釋掉[1] parallel()行時,我的結果用時大約是parallel()的三倍。 并行流似乎是一個甜蜜的交易。你所需要做的就是將編程問題轉換為流,然后插入parallel()以加快速度。實際上,有時候這很容易。但遺憾的是,有許多陷阱。 - parallel()不是靈丹妙藥 作為對流和并行流的不確定性的探索,讓我們看一個看似簡單的問題:對增長的數字序列進行求和。事實證明有大量的方式去實現它,并且我將冒險用計時器將它們進行比較 - 我會盡量小心,但我承認我可能會在計時代碼執行時遇到許多基本陷阱之一。結果可能有一些缺陷(例如JVM沒有“熱身”),但我認為它仍然提供了一些有用的指示。 我將從一個計時方法**timeTest()**開始,它采用**LongSupplier**,測量**getAsLong()**調用的長度,將結果與**checkValue**進行比較并顯示結果。 請注意,一切都必須嚴格使用**long**;我花了一些時間發現隱蔽的溢出,然后才意識到在重要的地方錯過了**long**。 所有關于時間和內存的數字和討論都是指“我的機器”。你的經歷可能會有所不同。 ```java // concurrent/Summing.java import java.util.stream.*; import java.util.function.*; import onjava.Timer; public class Summing { static void timeTest(String id, long checkValue, LongSupplier operation){ System.out.print(id + ": "); Timer timer = new Timer(); long result = operation.getAsLong(); if(result == checkValue) System.out.println(timer.duration() + "ms"); else System.out.format("result: %d%ncheckValue: %d%n", result, checkValue); } public static final int SZ = 100_000_000; // This even works: // public static final int SZ = 1_000_000_000; public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula public static void main(String[] args){ System.out.println(CHECK); timeTest("Sum Stream", CHECK, () -> LongStream.rangeClosed(0, SZ).sum()); timeTest("Sum Stream Parallel", CHECK, () -> LongStream.rangeClosed(0, SZ).parallel().sum()); timeTest("Sum Iterated", CHECK, () -> LongStream.iterate(0, i -> i + 1) .limit(SZ+1).sum()); // Slower & runs out of memory above 1_000_000: // timeTest("Sum Iterated Parallel", CHECK, () -> // LongStream.iterate(0, i -> i + 1) // .parallel() // .limit(SZ+1).sum()); } } ``` 輸出結果: ``` 5000000050000000 Sum Stream: 167ms Sum Stream Parallel: 46ms Sum Iterated: 284ms ``` **CHECK**值是使用Carl Friedrich Gauss(高斯)在1700年代后期還在上小學的時候創建的公式計算出來的. **main()** 的第一個版本使用直接生成 **Stream** 并調用 **sum()** 的方法。我們看到流的好處在于即使SZ為十億(1_000_000_000)程序也可以很好地處理而沒有溢出(為了讓程序運行得快一點,我使用了較小的數字)。使用 **parallel()** 的基本范圍操作明顯更快。 如果使用**iterate()**來生成序列,則減速是相當明顯的,可能是因為每次生成數字時都必須調用lambda。但是如果我們嘗試并行化,當**SZ**超過一百萬時,結果不僅比非并行版本花費的時間更長,而且也會耗盡內存(在某些機器上)。當然,當你可以使用**range()**時,你不會使用**iterate()**,但如果你生成的東西不是簡單的序列,你必須使用**iterate()**。應用**parallel()**是一個合理的嘗試,但會產生令人驚訝的結果。我們將在后面的部分中探討內存限制的原因,但我們可以對流并行算法進行初步觀察: - 流并行性將輸入數據分成多個部分,因此算法可以應用于那些單獨的部分。 - 數組分割成本低,分割均勻且對分割的大小有著完美的掌控。 - 鏈表沒有這些屬性;“拆分”一個鏈表僅僅意味著把它分成“第一元素”和“其余元素”,這相對無用。 - 無狀態生成器的行為類似于數組;上面使用的 **range()** 就是無狀態的。 - 迭代生成器的行為類似于鏈表; **iterate()** 是一個迭代生成器。 現在讓我們嘗試通過在數組中填充值并對數組求和來解決問題。因為數組只分配了一次,所以我們不太可能遇到垃圾收集時序問題。 首先我們將嘗試一個充滿原始**long**的數組: ```java // concurrent/Summing2.java // {ExcludeFromTravisCI}import java.util.*; public class Summing2 { static long basicSum(long[] ia) { long sum = 0; int size = ia.length; for(int i = 0; i < size; i++) sum += ia[i];return sum; } // Approximate largest value of SZ before // running out of memory on mymachine: public static final int SZ = 20_000_000; public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; public static void main(String[] args) { System.out.println(CHECK); long[] la = newlong[SZ+1]; Arrays.parallelSetAll(la, i -> i); Summing.timeTest("Array Stream Sum", CHECK, () -> Arrays.stream(la).sum()); Summing.timeTest("Parallel", CHECK, () -> Arrays.stream(la).parallel().sum()); Summing.timeTest("Basic Sum", CHECK, () -> basicSum(la));// Destructive summation: Summing.timeTest("parallelPrefix", CHECK, () -> { Arrays.parallelPrefix(la, Long::sum); return la[la.length - 1]; }); } } ``` 輸出結果: ``` 200000010000000 Array Stream Sum: 104ms Parallel: 81ms Basic Sum: 106ms parallelPrefix: 265ms ``` 第一個限制是內存大小;因為數組是預先分配的,所以我們不能創建幾乎與以前版本一樣大的任何東西。并行化可以加快速度,甚至比使用 **basicSum()** 循環更快。有趣的是, **Arrays.parallelPrefix()** 似乎實際上減慢了速度。但是,這些技術中的任何一種在其他條件下都可能更有用 - 這就是為什么你不能做出任何確定性的聲明,除了“你必須嘗試一下”。 最后,考慮使用包裝類**Long**的效果: ```java // concurrent/Summing3.java // {ExcludeFromTravisCI} import java.util.*; public class Summing3 { static long basicSum(Long[] ia) { long sum = 0; int size = ia.length; for(int i = 0; i < size; i++) sum += ia[i]; return sum; } // Approximate largest value of SZ before // running out of memory on my machine: public static final int SZ = 10_000_000; public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; public static void main(String[] args) { System.out.println(CHECK); Long[] aL = newLong[SZ+1]; Arrays.parallelSetAll(aL, i -> (long)i); Summing.timeTest("Long Array Stream Reduce", CHECK, () -> Arrays.stream(aL).reduce(0L, Long::sum)); Summing.timeTest("Long Basic Sum", CHECK, () -> basicSum(aL)); // Destructive summation: Summing.timeTest("Long parallelPrefix",CHECK, ()-> { Arrays.parallelPrefix(aL, Long::sum); return aL[aL.length - 1]; }); } } ``` 輸出結果: ``` 50000005000000 Long Array Stream Reduce: 1038ms Long Basic Sum: 21ms Long parallelPrefix: 3616ms ``` 現在可用的內存量大約減半,并且所有情況下所需的時間都會很長,除了**basicSum()**,它只是循環遍歷數組。令人驚訝的是, **Arrays.parallelPrefix()** 比任何其他方法都要花費更長的時間。 我將 **parallel()** 版本分開了,因為在上面的程序中運行它導致了一個冗長的垃圾收集,扭曲了結果: ```java // concurrent/Summing4.java // {ExcludeFromTravisCI} import java.util.*; public class Summing4 { public static void main(String[] args) { System.out.println(Summing3.CHECK); Long[] aL = newLong[Summing3.SZ+1]; Arrays.parallelSetAll(aL, i -> (long)i); Summing.timeTest("Long Parallel", Summing3.CHECK, () -> Arrays.stream(aL) .parallel() .reduce(0L,Long::sum)); } } ``` 輸出結果: ``` 50000005000000 Long Parallel: 1014ms ``` 它比非parallel()版本略快,但并不顯著。 導致時間增加的一個重要原因是處理器內存緩存。使用**Summing2.java**中的原始**long**,數組**la**是連續的內存。處理器可以更容易地預測該陣列的使用,并使緩存充滿下一個需要的陣列元素。訪問緩存比訪問主內存快得多。似乎 **Long parallelPrefix** 計算受到影響,因為它為每個計算讀取兩個數組元素,并將結果寫回到數組中,并且每個都為**Long**生成一個超出緩存的引用。 使用**Summing3.java**和**Summing4.java**,**aL**是一個**Long**數組,它不是一個連續的數據數組,而是一個連續的**Long**對象引用數組。盡管該數組可能會在緩存中出現,但指向的對象幾乎總是不在緩存中。 這些示例使用不同的SZ值來顯示內存限制。 為了進行時間比較,以下是SZ設置為最小值1000萬的結果: **Sum Stream: 69msSum Stream Parallel: 18msSum Iterated: 277ms Array Stream Sum: 57ms Parallel: 14ms Basic Sum: 16ms parallelPrefix: 28ms Long Array Stream Reduce: 1046ms Long Basic Sum: 21ms Long parallelPrefix: 3287ms Long Parallel: 1008ms** 雖然Java 8的各種內置“并行”工具非常棒,但我認為它們被視為神奇的靈丹妙藥:“只需添加parallel()并且它會更快!” 我希望我已經開始表明情況并非所有都是如此,并且盲目地應用內置的“并行”操作有時甚至會使運行速度明顯變慢。 - parallel()/limit()交點 使用**parallel()**時會有更復雜的問題。從其他語言中吸取的流機制被設計為大約是一個無限的流模型。如果你擁有有限數量的元素,則可以使用集合以及為有限大小的集合設計的關聯算法。如果你使用無限流,則使用針對流優化的算法。 Java 8將兩者合并起來。例如,**Collections**沒有內置的**map()**操作。在**Collection**和**Map**中唯一類似流的批處理操作是**forEach()**。如果要執行**map()**和**reduce()**等操作,必須首先將**Collection**轉換為存在這些操作的**Stream**: ```java // concurrent/CollectionIntoStream.java import onjava.*; import java.util.*; import java.util.stream.*; public class CollectionIntoStream { public static void main(String[] args) { List<String> strings = Stream.generate(new Rand.String(5)) .limit(10) .collect(Collectors.toList()); strings.forEach(System.out::println); // Convert to a Stream for many more options: String result = strings.stream() .map(String::toUpperCase) .map(s -> s.substring(2)) .reduce(":", (s1, s2) -> s1 + s2); System.out.println(result); } } ``` 輸出結果: ``` btpen pccux szgvg meinn eeloz tdvew cippc ygpoa lkljl bynxt :PENCUXGVGINNLOZVEWPPCPOALJLNXT ``` **Collection**確實有一些批處理操作,如**removeAll()**,**removeIf()**和**retainAll()**,但這些都是破壞性的操作。**ConcurrentHashMap**對**forEach**和**reduce**操作有特別廣泛的支持。 在許多情況下,只在集合上調用**stream()**或者**parallelStream()**沒有問題。但是,有時將**Stream**與**Collection**混合會產生意想不到的結果。這是一個有趣的難題: ```java // concurrent/ParallelStreamPuzzle.java import java.util.*; import java.util.function.*; import java.util.stream.*; public class ParallelStreamPuzzle { static class IntGenerator implements Supplier<Integer> { private int current = 0; @Override public Integer get() { return current++; } } public static void main(String[] args) { List<Integer> x = Stream.generate(newIntGenerator()) .limit(10) .parallel() // [1] .collect(Collectors.toList()); System.out.println(x); } } /* Output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] */ ``` 如果[1]注釋運行它,它會產生預期的: **[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]** 每次。但是包含了parallel(),它看起來像一個隨機數生成器,帶有輸出(從一次運行到下一次運行不同),如: **[0, 3, 6, 8, 11, 14, 17, 20, 23, 26]** 這樣一個簡單的程序怎么會如此糟糕呢?讓我們考慮一下我們在這里要實現的目標:“并行生成。”那意味著什么?一堆線程都在從一個生成器取值,然后以某種方式選擇有限的結果集?代碼看起來很簡單,但它變成了一個特別棘手的問題。 為了看到它,我們將添加一些儀器。由于我們正在處理線程,因此我們必須將任何跟蹤信息捕獲到并發數據結構中。在這里我使用**ConcurrentLinkedDeque**: ```java // concurrent/ParallelStreamPuzzle2.java import java.util.*; import java.util.function.*; import java.util.stream.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.nio.file.*; public class ParallelStreamPuzzle2 { public static final Deque<String> TRACE = new ConcurrentLinkedDeque<>(); static class IntGenerator implements Supplier<Integer> { private AtomicInteger current = new AtomicInteger(); @Override public Integer get() { TRACE.add(current.get() + ": " +Thread.currentThread().getName()); return current.getAndIncrement(); } } public static void main(String[] args) throws Exception { List<Integer> x = Stream.generate(newIntGenerator()) .limit(10) .parallel() .collect(Collectors.toList()); System.out.println(x); Files.write(Paths.get("PSP2.txt"), TRACE); } } ``` 輸出結果: ``` [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` current是使用線程安全的 **AtomicInteger** 類定義的,可以防止競爭條件;**parallel()**允許多個線程調用**get()**。 在查看 **PSP2.txt**.**IntGenerator.get()** 被調用1024次時,你可能會感到驚訝。 **0: main 1: ForkJoinPool.commonPool-worker-1 2: ForkJoinPool.commonPool-worker-2 3: ForkJoinPool.commonPool-worker-2 4: ForkJoinPool.commonPool-worker-1 5: ForkJoinPool.commonPool-worker-1 6: ForkJoinPool.commonPool-worker-1 7: ForkJoinPool.commonPool-worker-1 8: ForkJoinPool.commonPool-worker-4 9: ForkJoinPool.commonPool-worker-4 10: ForkJoinPool.commonPool-worker-4 11: main 12: main 13: main 14: main 15: main...10 17: ForkJoinPool.commonPool-worker-110 18: ForkJoinPool.commonPool-worker-610 19: ForkJoinPool.commonPool-worker-610 20: ForkJoinPool.commonPool-worker-110 21: ForkJoinPool.commonPool-worker-110 22: ForkJoinPool.commonPool-worker-110 23: ForkJoinPool.commonPool-worker-1** 這個塊大小似乎是內部實現的一部分(嘗試使用`limit()` 的不同參數來查看不同的塊大小)。將`parallel()`與`limit()`結合使用可以預取一串值,作為流輸出。 試著想象一下這里發生了什么:一個流抽象出無限序列,按需生成。當你要求它并行產生流時,你要求所有這些線程盡可能地調用`get()`。添加`limit()`,你說“只需要這些。”基本上,當你為了隨機輸出而選擇將`parallel()`與`limit()`結合使用時,這種方法可能對你正在解決的問題有效。但是當你這樣做時,你必須明白。這是一個僅限專家的功能,而不是要爭辯說“Java弄錯了”。 什么是更合理的方法來解決問題?好吧,如果你想生成一個int流,你可以使用IntStream.range(),如下所示: ```java // concurrent/ParallelStreamPuzzle3.java // {VisuallyInspectOutput} import java.util.*; import java.util.stream.*; public class ParallelStreamPuzzle3 { public static void main(String[] args) { List<Integer> x = IntStream.range(0, 30) .peek(e -> System.out.println(e + ": " +Thread.currentThread() .getName())) .limit(10) .parallel() .boxed() .collect(Collectors.toList()); System.out.println(x); } } ``` 輸出結果: ``` 8: main 6: ForkJoinPool.commonPool-worker-5 3: ForkJoinPool.commonPool-worker-7 5: ForkJoinPool.commonPool-worker-5 1: ForkJoinPool.commonPool-worker-3 2: ForkJoinPool.commonPool-worker-6 4: ForkJoinPool.commonPool-worker-1 0: ForkJoinPool.commonPool-worker-4 7: ForkJoinPool.commonPool-worker-1 9: ForkJoinPool.commonPool-worker-2 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` 為了表明**parallel()**確實有效,我添加了一個對**peek()**的調用,這是一個主要用于調試的流函數:它從流中提取一個值并執行某些操作但不影響從流向下傳遞的元素。注意這會干擾線程行為,但我只是嘗試在這里做一些事情,而不是實際調試任何東西。 你還可以看到**boxed()**的添加,它接受**int**流并將其轉換為**Integer**流。 現在我們得到多個線程產生不同的值,但它只產生10個請求的值,而不是1024個產生10個值。 它更快嗎?一個更好的問題是:什么時候開始有意義?當然不是這么小的一套;上下文切換的代價遠遠超過并行性的任何加速。很難想象什么時候用并行生成一個簡單的數字序列會有意義。如果你要生成的東西需要很高的成本,它可能有意義 - 但這都是猜測。只有通過測試我們才能知道用并行是否有效。記住這句格言:“首先使它工作,然后使它更快地工作 - 只有當你必須這樣做時。”**parallel()**和**limit()**僅供專家使用(把話說在前面,我不認為自己是這里的專家)。 - 并行流只看起來很容易 實際上,在許多情況下,并行流確實可以毫不費力地更快地產生結果。但正如你所見,僅僅將**parallel()**加到你的Stream操作上并不一定是安全的事情。在使用**parallel()**之前,你必須了解并行性如何幫助或損害你的操作。一個基本認知錯誤就是認為使用并行性總是一個好主意。事實上并不是。Stream意味著你不需要重寫所有代碼便可以并行運行它。但是流的出現并不意味著你可以不用理解并行的原理以及不用考慮并行是否真的有助于實現你的目標。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看