# 第一個例子
你可以在這里找到JVM平臺幾種語言的例子 [language adaptor](https://github.com/ReactiveX/):
* [RxGroovy 示例](https://github.com/ReactiveX/RxGroovy/tree/1.x/src/examples/groovy/rx/lang/groovy/examples)
* [RxClojure 示例](https://github.com/ReactiveX/RxClojure/tree/0.x/src/examples/clojure/rx/lang/clojure/examples)
* [RxScala 示例](https://github.com/ReactiveX/RxScala/tree/0.x/examples/src/main/scala)
下面的示例從一個字符串列表創建一個Observable,然后使用一個方法訂閱這個Observable。
### Java
```java
public static void hello(String... names) {
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}
});
}
```
```java
hello("Ben", "George");
Hello Ben!
Hello George!
```
### Groovy
```groovy
def hello(String[] names) {
Observable.from(names).subscribe { println "Hello ${it}!" }
}
```
```groovy
hello("Ben", "George")
Hello Ben!
Hello George!
```
### Clojure
```clojure
(defn hello
[&rest]
(-> (Observable/from &rest)
(.subscribe #(println (str "Hello " % "!")))))
```
```
(hello ["Ben" "George"])
Hello Ben!
Hello George!
```
### Scala
```scala
import rx.lang.scala.Observable
def hello(names: String*) {
Observable.from(names) subscribe { n =>
println(s"Hello $n!")
}
}
```
```scala
hello("Ben", "George")
Hello Ben!
Hello George!
```
# 如何使用RxJava
要使用RxJava,首先你需要創建Observable(它們發射數據序列),使用Observable操作符變換那些Observables,獲取嚴格符合你要求的數據,然后觀察并處理對這些數據序列(通過實現觀察者或訂閱者,然后訂閱變換后的Observable)。
## 創建Observables
要創建Observable,你可以手動實現Observable的行為,也可以傳遞一個函數給[`create(?)`](../operators/Create.md),還可以使用這些 [創建操作符](../operators/Creating-Observables.md) 將一個已有的數據結構轉換為Observable。
### 已有的數據結構創建Observable
你可以使用[`just(?)`](../operators/Just.md) 和[`from(?)`](../operators/From.md) 方法將對象,列表,對象屬性轉換為發射那些對象的Observable:
```java
Observable<String> o = Observable.from("a", "b", "c");
def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.from(list);
Observable<String> o = Observable.just("one object");
```
轉換后的Observable每發射一項數據,會同步地調用任何訂閱者的[`onNext()`](../Observables.md#回調方法)方法,最后會調用訂閱者的[`onCompleted()`](../Observables.md#回調方法)方法。
### 使用`create(?)`創建一個Observable
使用 [`create(?)`](../operators/Create.md) 方法,你可以創建你自己的Observable,可以實現異步I/O,計算操作,甚至是無限的數據流。
#### 同步的Observable示例
```groovy
/**
* 這個例子展示了一個自定義的Observable,當有訂閱時他會阻塞當前線程。
*/
def customObservableBlocking() {
return Observable.create { aSubscriber ->
50.times { i ->
if (!aSubscriber.unsubscribed) {
aSubscriber.onNext("value_${i}")
}
}
// after sending all values we complete the sequence
if (!aSubscriber.unsubscribed) {
aSubscriber.onCompleted()
}
}
}
// To see output:
customObservableBlocking().subscribe { println(it) }
```
#### 異步的Observable示例
The following example uses Groovy to create an Observable that emits 75 strings.
下面的例子使用`Groovy`創建了一個發射75個字符串的Observable。
為了讓它更清楚,例子很詳細,使用靜態類型和匿名內部類`Func1`:
```groovy
/**
* This example shows a custom Observable that does not block
* when subscribed to as it spawns a separate thread.
*/
def customObservableNonBlocking() {
return Observable.create({ subscriber ->
Thread.start {
for (i in 0..<75) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext("value_${i}")
}
// after sending all values we complete the sequence
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
} as Observable.OnSubscribe)
}
// To see output:
customObservableNonBlocking().subscribe { println(it) }
```
這是一個用`Clojure`寫的例子,使用Future(而不是直接用線程),實現很簡潔:
```clojure
(defn customObservableNonBlocking []
"This example shows a custom Observable that does not block
when subscribed to as it spawns a separate thread.
returns Observable<String>"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
; after sending all values we complete the sequence
(-> subscriber .onCompleted))
))
))
```
```clojure
; To see output
(.subscribe (customObservableNonBlocking) #(println %))
```
這個例子從維基百科網站抓取文章,每抓取一篇會調用一次`onNext`:
```clojure
(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.
return Observable<String> of HTML"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> subscriber .onCompleted))
))))
```
```clojure
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))
```
回到`Groovy`,同樣是從維基百科抓取文章,這兒使用閉包代替匿名內部類:
```groovy
/*
* Fetch a list of Wikipedia articles asynchronously.
*/
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
return Observable.create { subscriber ->
Thread.start {
for (articleName in wikipediaArticleNames) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
}
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
return subscriber
}
}
fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
.subscribe { println "--- Article ---\n${it.substring(0, 125)}" }
```
結果:
```text
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...
```
Note that all of the above examples ignore error handling, for brevity. See below for examples that include error handling.
More information can be found on the [[Observable]] and [[Creating Observables|Creating-Observables]] pages.
注意:為了簡潔,上面的所有例子都忽略了錯誤處理,查看下面包含錯誤處理的例子。
更多的信息可以在這里找到:[`Observable`](Observables.md) 和 [`Creating Observables`](../operators/Creating-Observables.md)。
## 使用變換操作
RxJava讓你可以鏈式使用`操作符`用來轉換和組合多個Observables。
The following example, in Groovy, uses a previously defined, asynchronous Observable that emits 75 items, skips over the first 10 of these ([`skip(10)`](http://reactivex.io/documentation/operators/skip.html)), then takes the next 5 ([`take(5)`](http://reactivex.io/documentation/operators/take.html)), and transforms them ([`map(...)`](http://reactivex.io/documentation/operators/map.html)) before subscribing and printing the items:
下面是一個`Groovy`的例子,使用之前的定義,它會異步發射75個字符串,跳過最開始的10個(([`skip(10)`](../operators/Skip.md)),然后獲取接下來的5個([`take(5)`](../operators/Taks.md)),在訂閱之前使用[`map()`](../operators/Map.md)轉換它們,然后打印結果字符串。
```groovy
/**
* Asynchronously calls 'customObservableNonBlocking' and defines
* a chain of operators to apply to the callback sequence.
*/
def simpleComposition() {
customObservableNonBlocking().skip(10).take(5)
.map({ stringValue -> return stringValue + "_xform"})
.subscribe({ println "onNext => " + it})
}
```
輸出結果
```text
onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform
```
這里有一個圖例解釋了轉換過程:
<img src="../images/operators/Composition.1.png" width="640" height="536" />
這一個例子使用`Clojure`,使用了三個異步的Observable,其中一個依賴另一個,使用[`zip`](../operators/Zip.md)組合這三個發射的數據項為一個單個數據項,最后使用[`map()`](../operators/Map.md)轉換這個結果:
```clojure
(defn getVideoForUser [userId videoId]
"Get video metadata for a given userId
- video metadata
- video bookmark position
- user data
return Observable<Map>"
(let [user-observable (-> (getUser userId)
(.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
bookmark-observable (-> (getVideoBookmark userId videoId)
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
; getVideoMetadata requires :language from user-observable so nest inside map function
video-metadata-observable (-> user-observable
(.mapMany
; fetch metadata after a response from user-observable is received
(fn [user-map]
(getVideoMetadata videoId (:language user-map)))))]
; now combine 3 observables using zip
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
(fn [bookmark-map metadata-map user-map]
{:bookmark-map bookmark-map
:metadata-map metadata-map
:user-map user-map}))
; and transform into a single response object
(.map (fn [data]
{:video-id videoId
:video-metadata (:metadata-map data)
:user-id userId
:language (:language (:user-map data))
:bookmark (:viewed-position (:bookmark-map data))
})))))
```
輸出是這樣的:
```clojure
{:video-id 78965,
:video-metadata {:video-id 78965, :title House of Cards: Episode 1,
:director David Fincher, :duration 3365},
:user-id 12345, :language es-us, :bookmark 0}
```
這里有一個圖例解釋了這個過程:
<img src="../images/operators/Composition.2.png" width="640" height="742" />
The following example, in Groovy, comes from [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013). It combines two Observables with the [`merge`](http://reactivex.io/documentation/operators/merge.html) operator, then uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to construct a single item out of the resulting sequence, then transforms that item with [`map`](http://reactivex.io/documentation/operators/map.html) before emitting it:
下面的例子使用`Groovy`,來自這里 [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013),它使用[`merge`](../operators/Merge.md)操作結合兩個Observables,使用[`reduce`](../operators/Reduce.md)操作符從結果序列構建一個單獨的結果數據項,然后在發射之前,使用[`map()`](../operators/Map.md)變換那個結果。
```groovy
public Observable getVideoSummary(APIVideo video) {
def seed = [id:video.id, title:video.getTitle()];
def bookmarkObservable = getBookmark(video);
def artworkObservable = getArtworkImageUrl(video);
return( Observable.merge(bookmarkObservable, artworkObservable)
.reduce(seed, { aggregate, current -> aggregate << current })
.map({ [(video.id.toString() : it] }))
}
```
這里也有一個圖例解釋[`reduce`](../operators/Reduce.md)從多個Observable的結果構建一個單一結構的過程:
<img src="../images/operators/Composition.3.png" width="640" height="640" />
## 錯誤處理
這里是另一個版本的維基百科的例子,包含錯誤處理代碼:
```groovy
/*
* Fetch a list of Wikipedia articles asynchronously, with error handling.
*/
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
return Observable.create({ subscriber ->
Thread.start {
try {
for (articleName in wikipediaArticleNames) {
if (true == subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
}
if (false == subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch(Throwable t) {
if (false == subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
return (subscriber);
}
});
}
```
下面的例子使用`Groovy`,注意錯誤發生時現在是如何調用[`onError(Throwable t)`](Observables.md#回調函數)的,下面的代碼傳遞給[`subscribe()`](../operators/Subscribe.md)第二個方法用戶處理`onError`通知:
```groovy
fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
.subscribe(
{ println "--- Article ---\n" + it.substring(0, 125) },
{ println "--- Error ---\n" + it.getMessage() })
```
查看 [`錯誤處理操作符`](../operators/Error-Handling-Operators.md) 這一頁了解更多RxJava中的錯誤處理技術,包括使用 [`onErrorResumeNext()`和`onErrorReturn()`](../operators/Catch.md)等方法,它們讓你可以從錯誤中恢復。
這里是一個`Groovy`的例子:
```groovy
myModifiedObservable = myObservable.onErrorResumeNext({ t ->
Throwable myThrowable = myCustomizedThrowableCreator(t);
return (Observable.error(myThrowable));
});
```
- ReactiveX
- Observables
- Single
- Subject
- Scheduler
- Operators
- 創建操作
- Create
- Defer
- Empty/Never/Throw
- From
- Interval
- Just
- Range
- Repeat
- Start
- Timer
- 變換操作
- Buffer
- FlatMap
- GroupBy
- Map
- Scan
- Window
- 過濾操作
- Debounce
- Distinct
- ElementAt
- Filter
- First
- IgnoreElements
- Last
- Sample
- Skip
- SkipLast
- Take
- TakeLast
- 結合操作
- And/Then/When
- CombineLatest
- Join
- Merge
- StartWith
- Switch
- Zip
- 錯誤處理
- Catch
- Retry
- 輔助操作
- Delay
- Do
- Materialize/Dematerialize
- ObserveOn
- Serialize
- Subscribe
- SubscribeOn
- TimeInterval
- Timeout
- Timestamp
- Using
- To
- 條件和布爾操作
- All/Contains/Amb
- DefaultIfEmpty
- SequenceEqual
- SkipUntil/SkipWhile
- TakeUntil/TakeWhile
- 算術和聚合操作
- Average/Concat/Reduce
- Max/Min/Count/Sum
- 異步操作
- 連接操作
- Connect
- Publish
- RefCount
- Replay
- 轉換操作
- 阻塞操作
- 字符串操作
- 按字母順序的操作符列表
- RxJava文檔和教程
- RxJava入門指南
- RxJava使用示例
- 實現自定義操作符
- 自定義插件
- Backpressure
- 錯誤處理
- Android模塊
- 參與開發
- 補充閱讀材料