這個示例驅動的教程是Java8數據流(Stream)的深入總結。當我第一次看到`Stream`API時,我非常疑惑,因為它聽起來和Java IO的`InputStream`和`OutputStream`一樣。但是Java8的數據流是完全不同的東西。數據流是單體(Monad),并且在Java8函數式編程中起到重要作用。
> 在函數式編程中,單體是一個結構,表示定義為步驟序列的計算。單體結構的類型定義了它對鏈式操作,或具有相同類型的嵌套函數的含義。
這個教程教給你如何使用Java8數據流,以及如何使用不同種類的可用的數據流操作。你將會學到處理次序以及流操作的次序如何影響運行時效率。這個教程也會詳細講解更加強大的流操作,`reduce`、`collect`和`flatMap`。最后,這個教程會深入探討并行流。
如果你還不熟悉Java8的lambda表達式,函數式接口和方法引用,你可能需要在開始這一章之前,首先閱讀我的[Java8教程](https://github.com/wizardforcel/modern-java-zh/blob/master/ch1.md)。
更新 - 我現在正在編寫用于瀏覽器的Java8數據流API的JavaScript實現。如果你對此感興趣,請在Github上訪問[Stream.js](https://github.com/winterbe/streamjs)。非常期待你的反饋。
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#數據流如何工作)數據流如何工作
數據流表示元素的序列,并支持不同種類的操作來執行元素上的計算:
~~~java
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
// C1
// C2
~~~
數據流操作要么是銜接操作,要么是終止操作。銜接操作返回數據流,所以我們可以把多個銜接操作不使用分號來鏈接到一起。終止操作無返回值,或者返回一個不是流的結果。在上面的例子中,`filter`、`map`和`sorted`都是銜接操作,而`forEach`是終止操作。列表上的所有流式操作請見[數據流的Javadoc](http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html)。你在上面例子中看到的這種數據流的鏈式操作也叫作操作流水線。
多數數據流操作都接受一些lambda表達式參數,函數式接口用來指定操作的具體行為。這些操作的大多數必須是無干擾而且是無狀態的。它們是什么意思呢?
當一個函數不修改數據流的底層數據源,它就是[無干擾的](http://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#NonInterference)。例如,在上面的例子中,沒有任何lambda表達式通過添加或刪除集合元素修改`myList`。
當一個函數的操作的執行是確定性的,它就是[無狀態的](http://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#Statelessness)。例如,在上面的例子中,沒有任何lambda表達式依賴于外部作用域中任何在操作過程中可變的變量或狀態。
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#數據流的不同類型)數據流的不同類型
數據流可以從多種數據源創建,尤其是集合。`List`和`Set`支持新方法`stream()`和`parallelStream()`,來創建串行流或并行流。并行流能夠在多個線程上執行操作,它們會在之后的章節中講到。我們現在來看看串行流:
~~~java
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
~~~
在對象列表上調用`stream()`方法會返回一個通常的對象流。但是我們不需要創建一個集合來創建數據流,就像下面那樣:
~~~java
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
~~~
只要使用`Stream.of()`,就可以從一系列對象引用中創建數據流。
除了普通的對象數據流,Java8還自帶了特殊種類的流,用于處理基本數據類型`int`、`long`和`double`。你可能已經猜到了它是`IntStream`、`LongStream`和`DoubleStream`。
`IntStream`可以使用`IntStream.range()`替換通常的`for`循環:
~~~java
IntStream.range(1, 4)
.forEach(System.out::println);
// 1
// 2
// 3
~~~
所有這些基本數據流都像通常的對象數據流一樣,但有一些不同。基本的數據流使用特殊的lambda表達式,例如,`IntFunction`而不是`Function`,`IntPredicate`而不是`Predicate`。而且基本數據流支持額外的聚合終止操作`sum()`和`average()`:
~~~java
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0
~~~
有時需要將通常的對象數據流轉換為基本數據流,或者相反。出于這種目的,對象數據流支持特殊的映射操作`mapToInt()`、`mapToLong()`和`mapToDouble()`:
~~~java
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3
~~~
基本數據流可以通過`mapToObj()`轉換為對象數據流:
~~~java
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
~~~
下面是組合示例:浮點數據流首先映射為整數數據流,之后映射為字符串的對象數據流:
~~~java
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
~~~
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#處理順序)處理順序
既然我們已經了解了如何創建并使用不同種類的數據流,讓我們深入了解數據流操作在背后如何執行吧。
銜接操作的一個重要特性就是延遲性。觀察下面沒有終止操作的例子:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
~~~
執行這段代碼時,不向控制臺打印任何東西。這是因為銜接操作只在終止操作調用時被執行。
讓我們通過添加終止操作`forEach`來擴展這個例子:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
~~~
執行這段代碼會得到如下輸出:
~~~
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
~~~
結果的順序可能出人意料。原始的方法會在數據流的所有元素上,一個接一個地水平執行所有操作。但是每個元素在調用鏈上垂直移動。第一個字符串`"d2"`首先經過`filter`然后是`forEach`,執行完后才開始處理第二個字符串`"a2"`。
這種行為可以減少每個元素上所執行的實際操作數量,就像我們在下個例子中看到的那樣:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
~~~
只要提供的數據元素滿足了謂詞,`anyMatch`操作就會返回`true`。對于第二個傳遞`"A2"`的元素,它的結果為真。由于數據流的鏈式調用是垂直執行的,`map`這里只需要執行兩次。所以`map`會執行盡可能少的次數,而不是把所有元素都映射一遍。
### [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#為什么順序如此重要)為什么順序如此重要
下面的例子由兩個銜接操作`map`和`filter`,以及一個終止操作`forEach`組成。讓我們再來看看這些操作如何執行:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
~~~
就像你可能猜到的那樣,`map`和`filter`會對底層集合的每個字符串調用五次,而`forEach`只會調用一次。
如果我們調整操作順序,將`filter`移動到調用鏈的頂端,就可以極大減少操作的執行次數:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
~~~
現在,`map`只會調用一次,所以操作流水線對于更多的輸入元素會執行更快。在整合復雜的方法鏈時,要記住這一點。
讓我們通過添加額外的方法`sorted`來擴展上面的例子:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
~~~
排序是一類特殊的銜接操作。它是有狀態的操作,因為你需要在處理中保存狀態來對集合中的元素排序。
執行這個例子會得到如下輸入:
~~~java
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
~~~
首先,排序操作在整個輸入集合上執行。也就是說,`sorted`以水平方式執行。所以這里`sorted`對輸入集合中每個元素的多種組合調用了八次。
我們同樣可以通過重排調用鏈來優化性能:
~~~java
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
~~~
這個例子中`sorted`永遠不會調用,因為`filter`把輸入集合減少至只有一個元素。所以對于更大的輸入集合會極大提升性能。
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#復用數據流)復用數據流
Java8的數據流不能被復用。一旦你調用了任何終止操作,數據流就關閉了:
~~~java
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
~~~
在相同數據流上,在`anyMatch`之后調用`noneMatch`會產生下面的異常:
~~~
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
~~~
要克服這個限制,我們需要為每個我們想要執行的終止操作創建新的數據流調用鏈。例如,我們創建一個數據流供應器,來構建新的數據流,并且設置好所有銜接操作:
~~~java
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
~~~
每次對`get()`的調用都構造了一個新的數據流,我們將其保存來調用終止操作。
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#高級操作)高級操作
數據流執行大量的不同操作。我們已經了解了一些最重要的操作,例如`filter`和`map`。我將它們留給你來探索所有其他的可用操作(請見[數據流的Javadoc](http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html))。下面讓我們深入了解一些更復雜的操作:`collect`、`flatMap`和`reduce`。
這一節的大部分代碼示例使用下面的`Person`列表來演示:
~~~java
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
~~~
### [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#collect)`collect`
`collect`是非常有用的終止操作,將流中的元素存放在不同類型的結果中,例如`List`、`Set`或者`Map`。`collect`接受收集器(Collector),它由四個不同的操作組成:供應器(supplier)、累加器(accumulator)、組合器(combiner)和終止器(finisher)。這在開始聽起來十分復雜,但是Java8通過內置的`Collectors`類支持多種內置的收集器。所以對于大部分常見操作,你并不需要自己實現收集器。
讓我們以一個非常常見的用例來開始:
~~~java
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());
System.out.println(filtered); // [Peter, Pamela]
~~~
就像你看到的那樣,它非常簡單,只是從流的元素中構造了一個列表。如果需要以`Set`來替代`List`,只需要使用`Collectors.toSet()`就好了。
下面的例子按照年齡對所有人進行分組:
~~~java
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
~~~
收集器十分靈活。你也可以在流的元素上執行聚合,例如,計算所有人的平均年齡:
~~~java
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge); // 19.0
~~~
如果你對更多統計學方法感興趣,概要收集器返回一個特殊的內置概要統計對象,所以我們可以簡單計算最小年齡、最大年齡、算術平均年齡、總和和數量。
~~~java
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
~~~
下面的例子將所有人連接為一個字符串:
~~~java
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
~~~
連接收集器接受分隔符,以及可選的前綴和后綴。
為了將數據流中的元素轉換為映射,我們需要指定鍵和值如何被映射。要記住鍵必須是唯一的,否則會拋出`IllegalStateException`異常。你可以選擇傳遞一個合并函數作為額外的參數來避免這個異常。
既然我們知道了一些最強大的內置收集器,讓我們來嘗試構建自己的特殊收集器吧。我們希望將流中的所有人轉換為一個字符串,包含所有大寫的名稱,并以`|`分割。為了完成它,我們通過`Collector.of()`創建了一個新的收集器。我們需要傳遞一個收集器的四個組成部分:供應器、累加器、組合器和終止器。
~~~java
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names); // MAX | PETER | PAMELA | DAVID
~~~
由于Java中的字符串是不可變的,我們需要一個助手類`StringJointer`。讓收集器構造我們的字符串。供應器最開始使用相應的分隔符構造了這樣一個`StringJointer`。累加器用于將每個人的大寫名稱加到`StringJointer`中。組合器知道如何把兩個`StringJointer`合并為一個。最后一步,終結器從`StringJointer`構造出預期的字符串。
### [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch2.md#flatmap)`flatMap`
我們已經了解了如何通過使用`map`操作,將流中的對象轉換為另一種類型。`map`有時十分受限,因為每個對象只能映射為一個其它對象。但如何我希望將一個對象轉換為多個或零個其他對象呢?`flatMap`這時就會派上用場。
`flatMap`將流中的每個元素,轉換為其它對象的流。所以每個對象會被轉換為零個、一個或多個其它對象,以流的形式返回。這些流的內容之后會放進`flatMap`所返回的流中。
在我們了解`flatMap`如何使用之前,我們需要相應的類型體系:
~~~java
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
~~~
下面,我們使用我們自己的關于流的知識來實例化一些對象:
~~~java
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
~~~
現在我們擁有了含有三個`foo`的列表,每個都含有三個`bar`。
`flatMap`接受返回對象流的函數。所以為了處理每個`foo`上的`bar`對象,我們需要傳遞相應的函數:
~~~java
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
~~~
像你看到的那樣,我們成功地將含有三個`foo`對象中的流轉換為含有九個`bar`對象的流。
最后,上面的代碼示例可以簡化為流式操作的單一流水線:
~~~java
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " + f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
~~~
`flatMap`也可用于Java8引入的`Optional`類。`Optional`的`flatMap`操作返回一個`Optional`或其他類型的對象。所以它可以用于避免煩人的`null`檢查。
考慮像這樣更復雜的層次結構:
~~~java
class Outer {
Nested nested;
}
class Nested {
Inner inner;
}
class Inner {
String foo;
}
~~~
為了處理外層示例上的內層字符串`foo`,你需要添加多個`null`檢查來避免潛在的`NullPointerException`:
~~~java
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}
~~~
可以使用`Optional`的`flatMap`操作來完成相同的行為:
~~~java
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
~~~
如果存在的話,每個`flatMap`的調用都會返回預期對象的`Optional`包裝,否則為`null`的`Optional`包裝。
### `reduce`
歸約操作將所有流中的元素組合為單一結果。Java8支持三種不同類型的`reduce`方法。第一種將流中的元素歸約為流中的一個元素。讓我們看看我們如何使用這個方法來計算出最老的人:
~~~java
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
~~~
`reduce`方法接受`BinaryOperator`積累函數。它實際上是兩個操作數類型相同的`BiFunction`。`BiFunction`就像是`Function`,但是接受兩個參數。示例中的函數比較兩個人的年齡,來返回年齡較大的人。
第二個`reduce`方法接受一個初始值,和一個`BinaryOperator`累加器。這個方法可以用于從流中的其它`Person`對象中構造帶有聚合后名稱和年齡的新`Person`對象。
~~~java
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
~~~
第三個`reduce`對象接受三個參數:初始值,`BiFunction`累加器和`BinaryOperator`類型的組合器函數。由于初始值的類型不一定為`Person`,我們可以使用這個歸約函數來計算所有人的年齡總和。:
~~~java
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
~~~
你可以看到結果是76。但是背后發生了什么?讓我們通過添加一些調試輸出來擴展上面的代碼:
~~~java
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
~~~
你可以看到,累加器函數做了所有工作。它首先使用初始值`0`和第一個人Max來調用累加器。接下來的三步中`sum`會持續增加,直到76。
等一下。好像組合器從來沒有調用過?以并行方式執行相同的流會揭開這個秘密:
~~~java
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
~~~
這個流的并行執行行為會完全不同。現在實際上調用了組合器。由于累加器被并行調用,組合器需要用于計算部分累加值的總和。
下一節我們會深入了解并行流。
## 并行流
流可以并行執行,在大量輸入元素上可以提升運行時的性能。并行流使用公共的`ForkJoinPool`,由`ForkJoinPool.commonPool()`方法提供。底層線程池的大小最大為五個線程 -- 取決于CPU的物理核數。
~~~java
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
~~~
在我的機器上,公共池默認初始化為3。這個值可以通過設置下列JVM參數來增減:
~~~
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
~~~
集合支持`parallelStream()`方法來創建元素的并行流。或者你可以在已存在的數據流上調用銜接方法`parallel()`,將串行流轉換為并行流。
為了描述并行流的執行行為,下面的例子向`sout`打印了當前線程的信息。
~~~java
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
~~~
通過分析調試輸出,我們可以對哪個線程用于執行流式操作擁有更深入的理解:
~~~
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
~~~
就像你看到的那樣,并行流使用了所有公共的`ForkJoinPool`中的可用線程來執行流式操作。在連續的運行中輸出可能有所不同,因為所使用的特定線程是非特定的。
讓我們通過添加額外的流式操作`sort`來擴展這個示例:
~~~java
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
~~~
結果起初可能比較奇怪:
~~~
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
~~~
`sort`看起來只在主線程上串行執行。實際上,并行流上的`sort`在背后使用了Java8中新的方法`Arrays.parallelSort()`。如[javadoc](https://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#parallelSort-T:A-)所說,這個方法會參照數據長度來決定以串行或并行來執行。
> 如果指定數據的長度小于最小粒度,它使用相應的`Arrays.sort`方法來排序。
返回上一節中`reduce`的例子。我們已經發現了組合器函數只在并行流中調用,而不在串行流中調用。讓我們來觀察實際上涉及到哪個線程:
~~~java
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
~~~
控制臺的輸出表明,累加器和組合器都在所有可用的線程上并行執行:
~~~
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
~~~
總之,并行流對擁有大量輸入元素的數據流具有極大的性能提升。但是要記住一些并行流的操作,例如`reduce`和`collect`需要額外的計算(組合操作),這在串行執行時并不需要。
此外我們已經了解,所有并行流操作都共享相同的JVM相關的公共`ForkJoinPool`。所以你可能需要避免實現又慢又卡的流式操作,因為它可能會拖慢你應用中嚴重依賴并行流的其它部分。
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊