RxJava
#Promise #Reactive #Functional #Monad #Stream
前言
RxJava, Reactive Java, 一個 Java FRP (Functional Reactive Programming) 的實現。
可以接龍的 callbacks。
類似於 command line 的 |
的概念。
ls | wc -l
RxJava 大約從 2013 年中一場研討會之後漸漸嶄露頭角。大約一年後 2014 年中後開始許多文章出現(英文)。2014 年底 trello 甚至應聘熟悉 RxJava 的開發者。
有誰在用?先從有貢獻來看:
- Square
- SoundCloud
- NetFlix
短期效用:有效避免巢狀 callback 增加可讀性以及減少 List<item>
的轉換成本。
鑑於 RxJava 太少範例可以參考,才開始撰寫這篇文章偏向範例而不是從學術性角度介紹起,筆者相信從對照的方式來學習會遠比從抽象理論學起,來得容易些。 而這篇文章的技術來源大多是閱讀 RxJava 源碼、測試程式源碼以及其 github Wiki 而來。如果有謬誤之處,歡迎用各種方式聯絡我。
- 注意:這邊直接使用 lambda λ 的表達式,如果你還不清楚,請跳轉到 Lambda
- java8.stream 也實現了 Reactive。
先看一些範例對照後,了解樣貌之後,我們再來討論 RxJava 基本使用概念與方法。
有效解決重複的 loop 增進效能,維持同個 loop
假設我們有一萬名使用者 List<User> users
,其中有五千名女性使用者。
列出使用者年齡:
Before:
List<Integer> getAgeList(List<User> users) {
List<Integer> ageList = new ArrayList<>();
for (User user : users) {
ageList.add(user.getAge());
}
return ageList;
}
After:
Observable<Integer> getAgeObs(List<User> users) {
return Observable.from(users).map(user -> user.getAge());
}
// 如果你堅持一定要傳遞 List
List<Integer> getAgeList(List<User> users) {
return getAgeObs(users).toList().toBlocking().single();
}
列出女性使用者:
Before:
List<User> getFemaleList(/* @Writable */List<User> users) {
List<User> femaleList = users;
Iterator<User> it = users.iterator();
while (it.hasNext()) {
if (it.getGender() != User.FEMALE) it.remove();
}
return femaleList;
}
After:
Observable<User> getFemaleObs(Observable<User> users) {
return Observable.from(users).filter(user -> user.getGender() == User.FEMALE);
}
// 如果你堅持一定要傳遞 List
List<User> getFemaleList(List<User> users) {
return getFemaleObs(users).toList().toBlocking().single();
}
組合一下就可以列出女性使用者年齡:
Before:
List<Integer> getFemaleAgeList(List<User> users) {
return getAgeList(getFemaleList(users)); // 如果不改變寫法,會整整跑完兩個 loop
}
你可以發現原本的寫法有個瑕疵,就是會先繞完一萬使用者,找出來五千名女性後,為了詢問年紀,只好再繞一次這五千名女性。
可以第一次找出女性使用者時,就順便問一下年紀嗎?
為了避免重複迴圈,你可改變寫法,以沿用 loop :
List<Integer> getFemaleAgeList(List<User> users) {
//return getAgeList(getFemaleList(users));
List<Integer> ageList = new ArrayList<>();
for (User user : users) {
if (user.getGender() == User.FEMALE) {
ageList.add(user.getAge());
}
}
return ageList;
}
維持一樣的寫法,它會同時做兩件事情:找出女性順便詢問年紀(過濾與轉換),避免重複的迴圈。
After:
Observable<Integer> getFemaleAgeObs(List<User> users) {
return Observable.from(users)
.filter(user -> user.getGender() == User.FEMALE)
.map(user -> user.getAge());
}
// 如果你堅持一定要傳遞 List
List<Integer> getFemaleAgeList(List<User> users) {
return getFemaleAgeObs(users).toList().toBlocking().single();
}
提前打斷迴圈的能力,避免不必要的過濾與轉換
列出 Top10 女性使用者:
Before:
List<User> getFemaleList(List<User> users, int limit) {
return getFemaleList(users).subList(0, limit);
}
// 這裡會完整繞完萬名使用者,找出千名女性使用者後,才抽出 Top10。
getFemaleList(users, 10);
After:
Observable<User> getFemaleObs(List<User> users, int limit) {
return getFemaleObs(users).take(limit);
}
// 這裡會蒐集到十名女性使用者後,即停止。
getFemaleObs(users, 10);
我們可以從這裡看到差異,儘管你只要找出十名女性使用者,原本的寫法卻會繞完萬名使用者,找出所有女性使用者,再分割前十名。 而 Observable 會聰明地蒐集到第十名女性使用者就馬上停止。原本的寫法要做到提前停止,就必須改寫:
List<User> getFemaleList(/* @Writable */List<User> users) { ... }
List<User> getFemaleList(List<User> users, int limit) {
//return getFemaleList(users).subList(0, limit);
List<User> femaleList = new ArrayList<>();
int i = 0;
for (User user : users) {
if (i >= limit) break;
if (user.getGender() == User.FEMALE) femaleList.add(user);
i++;
}
return femaleList;
}
// 比較靈活一點的寫法,提供 predicate function
List<User> getFemaleList(List<User> users, Func1<Boolean, Integer> predicate) {
List<User> femaleList = new ArrayList<>();
int i = 0;
for (User user : users) {
if (!predicate.call(i)) break;
if (user.getGender() == User.FEMALE) femaleList.add(user);
i++;
}
return femaleList;
}
// 而且 predicate function 可以分 until 或者 filter
List<User> getList(List<User> users, Func2<Boolean, User, Integer> until, Func2<Boolean, User, Integer> filter) {
List<User> list = new ArrayList<>();
int i = 0;
for (User user : users) {
if (!until.call(user, i)) break;
if (filter.call(user, i)) list.add(user);
i++;
}
return list;
}
getFemaleList(users, 10);
getFemaleList(users, i -> i <= 10); // predicate
getList(users, (user, i) -> i <= 10, (user, i) -> user.getGender() == User.FEMALE); // predicate
接下來,開始一點組合應用,列出 Top10 女性使用者年齡:
Before:
List<Integer> getFemaleAgeList(List<User> users, int limit) {
return getAgeList(getFemaleList(users, limit));
}
After:
List<Integer> getFemaleAgeList(List<User> users, int limit) {
return Observable.from(users)
.filter(user -> user.getGender() == User.FEMALE)
.take(limit)
.map(user -> user.getAge())
.toList().toBlocking().single(); // 如果你堅持一定要傳遞 List
}
你可以之後才決定選幾筆,Observable 選幾筆才作幾筆過濾與轉換,有效避免無謂的全數過濾與轉換。
把界面維持 Observable 傳遞,維持一樣的撰寫方法,直接組裝起來就好:
Observable<User> getFemaleObs(Observable<User> userObs) {
return userObs.filter(user -> user.getGender() == User.FEMALE);
}
Observable<Integer> getAgeObs(Observable<User> userObs) {
return userObs.map(user -> p.getAge());
}
Observable<Integer> getFemaleAgeObs(List<User> users) {
return getAgeObs(getFemaleObs(Observable.from(users)));
}
只做 10 筆過濾與轉換(女性、年齡):
getFemaleAgeObs(users)
.take(10) // 拿 10 筆
.toList().toBlocking().single();
拉平巢狀 callback 增加易讀性
Before:
void login(Activity activity, LoginListenr loginListener) {
loginFacebook(activity, fbUser -> {
getFbProfile(fbUser, fbProfile -> {
loginParse(fbProfile, parseUser -> {
getParseProfile(fbProfile, parseProfile -> {
loginListener.onLogin(parseProfile);
});
});
});
});
}
After:
void login(Activity activity, LoginListener loginListener) {
Observable.just(activity)
.flatMap(activity -> loginFacebook(activity))
.flatMap(fbUser -> getFbProfile(fbUser))
.flatMap(fbProfile -> loginParse(fbProfile))
.flatMap(parseUser -> getParseProfile(parseUser))
.subscribe(parseProfile -> loginListener.onLogin(parseProfile));
}
// wrap callback functions in Observable<?>
Observable<FbUser> loginFacebook(Activity activity) {
return Observable.create(sub -> {
loginFacebook(activity, fbUser -> {
sub.onNext(fbUser);
sub.onCompleted();
});
});
}
Observable<FbProfile> getFbProfile(FbUser fbUser) { ... }
Observable<ParseUser> loginParse(FbProfile fbProfile) { ... }
Observable<ParseProfile> getParseProfile(ParseUser parseUser) { ... }
如何導入套用與改變撰寫
既有長時間存取的函式改成 Observable
...
File download(String url) { ... return file; }
Observable<File> downloadObs(String url) {
return Observable.fromCallable(() -> download(url));
}
...
downloadObs(url).subscribeOn(Schedulers.io()).subscribe(file -> {
System.out.println(file);
});
ps. 2015/11 v1.0.15 fromCallable
instead of defer()
既有的 callback 改成 Observable
...
Observable<ParseUser> loginParseWithFacebook(Activity activity) {
return Observable.create(sub -> { // Observable.OnSubscriber
ParseFacebookUtils.logIn(Arrays.asList("public_profile", "email"), activity, new LogInCallback() {
@Override
public void done(final ParseUser parseUser, ParseException err) {
if (err != null) {
sub.onError(err);
} else {
sub.onNext(parseUser);
sub.onCompleted();
}
}
});
})
}
...
loginParseWithFacebook(activity).subscribe(parseUser -> {
System.out.println(parseUser);
});
轉換 map()
我們經常把 List<T>
轉成 List<R>
,如: List<TextView>
轉成 List<String>
,你可能會把整個 textViews 一一取出 toString()
然後抄一份:
List<String> strings = new ArrayList<>();
for (TextView textView : textViews) {
strings.add(textView.getText().toString());
}
萬一 textViews 有一萬筆,最終你其實在存取 strings 通常不會全部都用到,這樣就太浪費了。所以我們拿出牛仔精神 Cow - Copy-On-Write
(Lazy/CallByNeed),先寫好轉換程式,當拿到那筆再去轉換,當然缺點是 textViews 要一直拿著,要稍微留意一下。我們先想像一下,寫一個名稱叫做 MapList 的類別,先把 textViews 拿著,在取出的時候 (@Override public E get(int index)),再去跑轉換程式。這裡我們開放一個 map(Mappable)
好把轉換程式交給我們 (Mappable)。
MapList<T, R> extends ArrayList<R> { // @Unmodifitable
List<T> list;
Mappable<T, R> mapper;
public MapList(List<T> list) {
super();
this.list = list;
}
@Override public R get(int i) {
return mapper.map(list.get(i));
}
public MapList<T, R> map(Mappable<T, R> mapper) {
this.mapper = mapper;
return this;
}
public interface Mappable<T, R> {
R map(T t);
}
}
List<String> strings = new MapList<TextView, String>(textViews).map(textView -> textView.getText().toString());
這是我們自己寫一個 MapList 類別來達成,而現在有了 RxJava:
List<String> strings = new IteratorOnlyList(Observable.from(textViews)
.map(textView -> textView.getText().toString())
.toBlocking()
.getIterator());
如果你想維持 List 界面,為了維持 lazy ,又 RxJava 這邊只有提供到 Iterator ,所以我們沒有使用 toList().toBlocking().single()
,你可以寫一個 IteratorOnlyList 把這個 iterator 包起來,方便傳遞,雖然很多操作都殘缺。
盡可能還是改用 Observable 作為界面吧。
去除重複資料 distinct()
列出有發文的使用者:
Before:
List<User> getPostedUsers(List<Post> posts) {
Map<String, User> users = new HashMap<>();
for (Post post : posts) {
User user = post.getUser();
users.put(user.getObjectId(), user);
}
return new ArrayList<>(users.values());
}
After:
Observable<User> getPostedUsers(List<Post> posts) {
return Observable.from(posts).map(post -> post.getUser())
.distinct(user -> user.getObjectId());
}
多方合併 merge(), concatWith()
列出貼文以及留言的使用者:
Before:
List<User> getActivityUsers(Collection<Post> posts, Collection<Comment> comments) {
Map<String, User> users = new HashMap<>();
for (Post post : posts) {
User user = post.getUser();
users.put(user.getObjectId(), user);
}
for (Comment comment : comments) {
User user = comment.getUser();
users.put(user.getObjectId(), user);
}
return new ArrayList<>(users.values());
}
After:
Observable<User> getActivityUsers(Observable<Post> posts, Observable<Comment> comments) {
return Observable.merge(posts.map(post -> post.getUser()),
comments.map(comment -> comment.getUser()))
.distinct(user -> user.getObjectId());
}
merge()
與 concatWith()
最大的差異是, merge()
是併發同時進貨 ,所以會交錯,而 concatWith
則是排隊等到前面進貨完才換下一位。
優先列出貼文的使用者後,才列出留言的使用者:
Observable<User> getActivityUsers(Observable<Post> posts, Observable<Comment> comments) {
return posts.map(post -> post.getUser())
.concatWith(comments.map(comment -> comment.getUser()))
.distinct(user -> user.getObjectId());
}
如何使用
在我們看過一些對照組之後,大致上瞭解未來在使用上會呈現什麼樣貌。
我們開始回頭學學 What/Why/How,什麼是 RxJava 、為什麼要 RxJava 、如何開始使用 RxJava 。
首先你要認識基本操作元件:Observable
// AsyncTask 版本
AsyncTask<Void, Void, String> helloAsync = new AsyncTask<>() {
@Override public String doInBackground(Void... voids) {
return "Hello, world!";
}
}
// FutureTask 版本
FutureTask<String> helloFuture =
new FutureTask<>(new Callable<String>() {
@Override public String call() {
return "Hello, world!";
}
});
// Observable 版本
Observable<String> helloObs = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted(); // 因為 Observable 支援複數的關係,所以需要 onCompleted() ,到時候再細說。
}
}
);
// Observable lambda 版本:
Observable<String> helloObs = Observable.create(sub -> {
sub.onNext("Hello, world!");
sub.onCompleted();
});
這些 AsyncTask, FutureTask, Observable 都是生產者,定義出資料的產生,接下來,當產品生出來的時候通知你。所以我們補上 listener ,RxJava 稱之為 Subscriber:
Subscriber<String> helloSubscriber = new Subscriber<>() {
@Override public void onNext(String string) { System.out.println(string); }
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
};
helloObs.subscribe(helloSubscriber); // 你可以下訂(subscribe()) ,產品出產時就會通知你了(Subscriber)。
通常我們會用 subscribe(Action1<? super T> onNext, Action1<Throwable> onError), Action1<? super T> onCompleted)
來搭配 lambda ,寫起來會比較簡便:
helloObs.subscribe(string -> System.out.println(string));
helloObs.subscribe(string -> System.out.println(string), e -> e.printStackTrace());
helloObs.subscribe(string -> System.out.println(string), e -> e.printStackTrace(), () -> System.out.println("onCompleted"));
我們再稍微回到 Observable.create() , 如果你的原料早就準備好了,我們可以寫成:
Observable.just("Hello, world!").subscribe(string -> System.out.println(string));
接下來是簡單的加工:
Observable.just("Hello, world!")
.map(string -> "yongjhih: " + string) // "yongjhih: Hello, world!"
.map(string -> string.length())
.subscribe(length -> System.out.println(length)); // 21, 請不要真的去算長度, 此處僅示意.
所以從原物料的進貨作業(create(), just())到加工(map())到下訂(subscribe()) 有了流程上的基本認識。我們可以討論幾個站點的作用與概念:
首先,生產過程(進貨與加工)會被認定不可預期的處理時間,很有可能會很久的意思。
另一個是,JIT(Just In Time),零庫存概念,也就是說你可以定義很多道加工手續,但是再沒有下訂之前,通通都不會開跑的,包括進貨作業。
接下來,換一個比較實際的例子:
Observable.just("http://yongjhih.gitbooks.io/feed/content/RxJava.html")
.map(url -> download(url))
.subscribeOn(Schedulers.io()) // 把生產加工過程丟到背景去做
.subscribe(file -> System.out.println(file));
如果你有很多網址要下載,你可能會想到:
Observable<List<String>> urlList = Observable.just(Arrays.asList("http://yongjhih.gitbooks.io/feed/content/RxJava.html",
"http://yongjhih.gitbooks.io/feed/content/README.html"));
你可以留意到這時 Observable 產線上是 List<String>
了。所以你可能會這樣做:
urlList.map(list -> {
List<File> files = new ArrayList<>();
for (String url : urls) files.add(download(url));
return files;
})
.subscribeOn(Schedulers.io())
.subscribe(files -> {
for (File file : files) System.out.println(file);
});
但是這種情況,你應該使用 Observable.from(List<T>)
或 Observable.just(T...)
:
Observable<String> urls = Observable.from(Arrays.asList("http://yongjhih.gitbooks.io/feed/content/RxJava.html",
"http://yongjhih.gitbooks.io/feed/content/README.html"));
// or Observable<String> urls = Observable.just("http://yongjhih.gitbooks.io/feed/content/RxJava.html",
"http://yongjhih.gitbooks.io/feed/content/README.html");
urls.map(url -> download(url))
.subscribeOn(Schedulers.io())
.subscribe(file -> System.out.println(file));
如果原料是 List,但是加工時,想要一個一個處理,用 Observable.from() 會比較好操作,如果你用 Observable.just() 那就會拿到一個 List 。其實有個方法可以展開: .flatMap(list -> Observable.from(list))
:
Observable.just(Arrays.asList("http://yongjhih.gitbooks.io/feed/content/RxJava.html",
"http://yongjhih.gitbooks.io/feed/content/README.html"))
.map(urls -> {
List<File> files = new ArrayList<>();
for (String url : urls) files.add(download(url));
return files;
})
.flatMap(files -> Observable.from(files)) // 可以這裡才展開
.subscribeOn(Schedulers.io())
.subscribe(file -> System.out.println(file));
map()
是轉換產線上的物件 , flatMap() 是轉換 Observable<Object>
的方法,舉個例子:
Observable.just("http://yongjhih.gitbooks.io/feed/content/RxJava.html")
.flatMap(url -> downloadObs(url))
.map(file -> file.size())
.susbscirbe(size -> Systen.out.println(size));
Observable<File> downloadObs(String url) { ... }
產生器與流量控制
原料無中生有。
印出 1 到 10:
Before:
for (int i = 1; i <= 10; i++) {
System.out.println(i);
}
After:
Observable.range(1, 10).forEach(i -> System.out.println(i));
這邊第一次使用 forEach()
,僅差 subscribe()
會回傳 Subscription
,Subscription
是用來停止生產的。
秒針:
// 計秒器
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS).subscribe(l -> System.out.println(l));
Observable.just(sub).delay(10, TimeUnit.SECONDS).subscribe(s -> s.unsubscribe()); // 10 秒後停掉計秒器
片語 toBlocking().single()
這行會卡住,直到拿到一個為止。
single(), take(1), first()
single() 與 take(1) 最大的不同是,single() 只能使用在單數的 Observable 上,否則會噴 exception 。
例如:
List<Integer> integers = Observable.range(1, 2).toList().single(); // pass
Integer i = Observable.range(1, 2).toBlocking().take(1); // pass
Observable<Integer> singleInteger = Observable.range(1, 1).single(); // pass
Observable<Integer> singleInteger = Observable.range(1, 2).single(); // exception
Subscriber/Observer
Subscriber/Observer 是一種 Listener/Callback ,只是一般 Listener/Callback 只有表示一種事件,例如:onCompleted(T emit)
,在 RxJava 的安排則是:onNext(T)
, onCompleted()
, onError(Throwable)
。
Observer 只有這三個事件而 Subscriber 繼承了並多了 unsubscribe()
, isUnsubscribed()
Subscriber:
public abstract class Subscriber<T> implements Observer<T>, Subscription { ... }
public interface Subscription {
public void unsubscribe();
public boolean isUnsubscribed();
}
public interface Observer<T> {
public abstract void onCompleted();
public abstract void onError(Throwable e);
public abstract void onNext(T t);
}
名詞解釋
Observable 觀測所 Subscriber 訂閱者 Observer 觀察員
描述這是怎樣的工作,以及中間需要的製程,希望產生出什麼產品。
Observable
Subscriber/Observer
subscribe 下訂。
Subscription 訂單, subscribe 下訂之後產生出來的訂單,這個訂單可以用來取消訂單來中止生產。
Eager vs. Lazy
動手玩
git clone https://github.com/yongjhih/RxJava-retrofit-github-sample
cd RxJava-retrofit-github-sample
./gradlew execute
修改 src/main/java/com/github/yongjhih/Main.java 內容就可以自己玩了。
組合資料 zip()
Observable<User> getUser(Activity activity) {
Observable.zip(getFbUser(activity), getParseUser(activity),
(fbUser, parseUser) -> getUser(fbUser, parseUser));
}
Subject
相當於 Event Bus,多方進貨,多方出貨。開放式,俗稱 Hot ,對應之前封閉式的 Observable 為 Cold 。
多方進貨,相當於把 Observable.OnSubscriber ,讓其他人可以塞資料。
Before:
Observable.just("hello, world!").subscribe(System.out::println);
After:
Subject<String, String> subject = PublishSubject.create();
subject.asObservable().subscribe(System.out::println);
subject.onNext("hello, world!");
找一個實際點的例子:
Subject<View> mLikeCountSubject = PublishSubject.create();
@OnClick(R.id.like_button)
public void onLikeClick(View view) {
mLikeCountSubject.onNext(view);
}
@Override
public void onResume() {
super.onResume();
mLikeCountSubject.asObservable().map(view -> 1)
.scan((count, i) -> count + i)
.subscribe(count -> likeText.setText(count.toString()));
}
http://reactivex.io/documentation/subject.html
Exception 處理
如果發生 exception 重試,會重新進貨,從頭再跑一輪。
最常用的是 retry(Func2<Integer, Throwable, Boolean> predicate)
如果是 NullPointerException 才重試:
retry((c, e)) -> e instanceof NullPointerException);
一直重試:
retry()
重試 3 次:
retry(3)
使用 Handler retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler)
,例如,隨著重試次數延後重試時間:
Observable.create((Subscriber<? super String> s) -> {
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempt -> {
return attempt.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().subscribe(System.out::println);
忽略 exception:
.onErrorResumeNext(e -> Observable.empty()); // onCompleted()
或傳個替代資料:
.flatMap(parseUser -> {
return Observable.zip(createChannel(parseUser), createFriendRole(parseUser), (user, user2) -> user)
.onErrorResumeNext(e -> Observable.just(parseUser));
})
flatMap()
p.s. 似乎很多讀者對於 flatMap()
有理解的困難,所以這裡特別解釋一下 flatMap
flat and map 拉平並轉換,通常筆者稱呼成 Observable 的「置換」。
concatMap() 與 flatMap()
Observable<File> downloadObs(String url) { ... }
Observable.just("https://raw.githubusercontent.com/yongjhih/android-gitbook/master/README.md", https://raw.githubusercontent.com/yongjhih/android-gitbook/master/RxJava.md)
.flatMap(s -> downloadObs(s).subscribeOn(Schedulers.io()))
.subscribe(f -> System.out.println("downloaded: " + f));
這裡會同時開兩個 threads 在下載。避免這種情況,希望一個一個跑:
Observable.just("https://raw.githubusercontent.com/yongjhih/android-gitbook/master/README.md", https://raw.githubusercontent.com/yongjhih/android-gitbook/master/RxJava.md)
.concatMap(s -> downloadObs(s).subscribeOn(Schedulers.io()))
.subscribe(f -> System.out.println("downloaded: " + f));
switchMap() 與 flatMap()
swtichMap 主要用於序列化,而 flatMap 是允許並列。
排序 toSortedList()
分組 groupBy()
分段 window()
快取 cache()
取樣與防連點 throttleLast()/debounce()
.throttleLast(100, TimeUnit.MILLISECONDS) // 限制流量 (sample())
.debounce(200, TimeUnit.MILLISECONDS) // 延遲觸發 (當作詠唱時間)
.onBackpressureLatest()
Single
簡化版本的 Observable。
- 原本的
Observable.subscribe(item -> {}, e -> {}, () -> {})
- 簡化成
Single.subscribe(item -> {}, e -> {})
因為數量只有一個,所以不需要告知結束(onCompleted()
)
Observable 的 Subscriber:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
// ...
}
public abstract class SingleSubscriber<T> implements Subscription {
public abstract void onSuccess(T value);
public abstract void onError(Throwable error);
// ...
}
Single.toObservable()
類似於 Observable.single()
效果
Completable
簡化版本的 Observable/Single
- 原本的
Observable.subscribe(item -> {}, e -> {}, () -> {})
- 原本的
Single.subscribe(item -> {}, e -> {})
- 簡化成
Completable.subscribe(() -> {}, e -> {})
public interface CompletableSubscriber {
void onCompleted();
void onError(Throwable e);
// ...
}
Completable.fromAction(() -> System.out.println("Hello World!"))
.subscribe();
利用 compose(Transformer) 重用常用的流程組合
Compose/Transformer 0.20 版本(2014.8)
先丟背景等等回來前景:
Before:
.observeOn(AndroidScheduler.mainThread())
.subscribeOn(Schedulers.io))
After:
.compose(mainAsync())
<T> Transformer<T, T> mainAsync() {
return obs -> obs.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
計數器:
Before:
.map(o -> 1)
.scan((count, i) -> count + i)
After:
.compose(amount())
<T> Transformer<T, T> amount() {
return obs -> obs.map(o -> 1)
.scan((count, i) -> count + i);
}
public class Transformers {
@SuppressWarnings("unchecked")
private static final Transformer MAIN_ASYNC = obs -> obs.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@SuppressWarnings("unchecked")
public static final <T> Transformer<T, T> mainAsync() {
return MAIN_ASYNC;
}
@SuppressWarnings("unchecked")
private static final Transformer AMOUNT = obs -> obs.map(o -> 1)
.scan((count, i) -> count + i);
@SuppressWarnings("unchecked")
public static final <T> Transformer<T, T> amount() {
return AMOUNT;
}
}
附錄:Android View Observable 範例
寫一個讚計數器:
ViewObservable.clicks(findViewById(R.id.like_button))
.map(clickEvent -> 1)
.scan((count, i) -> count + i)
.subscribe(likes -> {
TextView likesView = (TextView) findViewById(R.id.likes_view);
textView.setText(likes.toString());
});
透過 Observable 實現 Java8 Optional
在沒有 Optional 的狀況下:
Before:
String displayName(ParseUser parseUser) {
String displayName = parseUser.getString("displayName");
if (displayName == null) {
displayName = "Unnamed";
}
return displayName;
}
After:
String displayName(ParseUser parseUser) {
return Optional.ofNullable(parseUser.getString("displayName")).orElse("Unnamed");
}
// Optional 實現:
public class Optional<T> {
Observable<T> obs;
public Optional(Observable<T> obs) {
this.obs = obs;
}
public static <T> Optional<T> of(T value) {
if (value == null) {
throw new NullPointerException();
} else {
return new Optional<T>(Observable.just(value));
}
}
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return new Optional<T>(Observable.empty());
} else {
return new Optional<T>(Observable.just(value));
}
}
public T get() {
return obs.toBlocking().single();
}
public T orElse(T defaultValue) {
return obs.defaultIfEmpty(defaultValue).toBlocking().single();
}
}
Oracle 官方比較複雜的 Optional 範例,當各類別傳遞已作成 Optional 界面:
Before:
String getVersion(Computer computer) {
String version = "UNKNOWN";
if (computer != null) {
Soundcard soundcard = computer.getSoundcard();
if (soundcard != null) {
USB usb = soundcard.getUsb();
if (usb != null) {
version = usb.getVersion();
}
}
}
return version;
}
After:
// Optional 版本
String getVersion(Computer computer) {
return computer.flatMap(Computer::soundcard) // soundcard()
.flatMap(Soundcard::usb) // usb()
.map(Usb::getVersion)
.orElse("UNKNOWN");
}
public class Computer {
private Soundcard soundcard;
public Optional<Soundcard> soundcard() { return Optional.ofNullable(soundcard); }
public Soundcard getSoundcard() { return soundcard; }
}
public class Soundcard {
private Usb usb;
public Optional<Usb> usb() { return Optional.ofNullable(usb); }
public Usb getUsb() { return usb; }
}
public class Usb {
String version;
public String getVersion() { return version; }
}
P.S. Groovy 語言: String version = computer?.getSoundcard()?.getUSB()?.getVersion();
如果真的要用 RxJava Optional 請用專門的專案 https://github.com/eccyan/RxJava-Optional
這邊僅是教學需要。 (https://gist.github.com/yongjhih/25017ac41efb4634c2ab)
Threading, Scheduler 執行序控制
Observable.just("http://yongjhih.gitbooks.io/feed/content/RxJava.html").map(url -> download(url))
.subscribeOn(Schedulers.io())) // 在 io Thread() 跑 (60 thread pool, 60s timeout)
.observeOn(AndroidScheduler.mainThread()) // 在 mainThread 回來印
.subscirbe(System.out::println);
...
.subscribeOn(Schedulers.io())) // 這之前都用 io thread pool
.observeOn(AndroidScheduler.mainThread()) // 這裡之後都用 mainThread 跑
...
.subscirbe(System.out::println);
常用 thread pool:
Schedulers.newThread()
無條件開一個 thread (ref. NewThreadWorker)Schedulers.io()
io 性質的 thread ,如網路存取。60s 逾時 (ref. CachedThreadScheduler)Schedulers.computation()
運算性質的 thread。thread pool 依據 cpu 數量 (ref. EventLoopsScheduler)AndroidScheduler.mainThread()
(ref. AndroidSchedulers)
ref. http://reactivex.io/documentation/scheduler.html#collapseRxJava
改善 Cache 流程
Observable.merge(ParseObservable.getPostsFromLocalDatabase(), ParseObservable.getPosts()).distinct(o -> o.getObjectId())
或者用 amb()
, first()
等方式作
Notification
Backpressure
Testing
TestSubscriber
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.just("yongjhih", "rxassertions").toBlocking().subscribe(testSubscriber);
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
testSubscriber.assertValues("yongjhih", "rxassertions");
Assertions
rxassertions
RxAssertions.assertThat(Observable.just("yongjhih", "rxassertions"))
.withoutErrors()
.completes()
.expectedValues("yongjhih", "rxassertions");
or
Single<String> single = Single.fromCallable(() -> "RxJava");
RxAssertions.assertThat(single).completes().expectedSingleValue("RxJava");
Deprecated: assertj-rx:
assertThat(observable.toBlocking())
.completes()
.emitsSingleValue("hello");
常見手法
目前似乎還不支援:
Observable<Boolean> Observable.isNotEmpty()
一般的替代作法是:
obs.isEmpty().map(b -> !b);
目前似乎還不支援:
Observable<Boolean> Observable.contains(T... )
一般的作法是:
obs.exists(o -> o == v1 || o == v2);
or
obs.filter(o -> o == v1 || o == v2).isEmpty().map(b -> !b);
Hooks
- RxJavaObservableExecutionHook
- RxJavaSchedulersHook
- RxJavaErrorHandler
後記
這裡鮮少討論一般常見的理論:無論是 FRP, monad, push/pull, cold/hot 。主要因為先以範例,也就是實際看得到的改善是什麼來作介紹。
http://yarikx.github.io/NotRxJava/ 也是從這種方式帶領你看發展進程,希望大家會喜歡
See Also
- 章節:輕量資料流處理
文獻:
- https://speakerdeck.com/jakewharton/2014-1?slide=13
- http://slides.com/yaroslavheriatovych/frponandroid
- http://en.wikipedia.org/wiki/Null_coalescing_operator
- http://www.oracle.com/technetwork/articles/java/java8-optional-2175753.html
- http://java.dzone.com/articles/java-8-elvis-operator
- http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/
- https://blog.growth.supply/party-tricks-with-rxjava-rxandroid-retrolambda-1b06ed7cd29c
一些 Rx 開源:
- https://github.com/yongjhih/RxParse
- https://github.com/ogaclejapan/RxBinding
- https://github.com/eccyan/RxJava-Optional
- https://github.com/square/sqlbrite
- https://github.com/pushtorefresh/storio
- https://github.com/mcharmas/Android-ReactiveLocation
- https://github.com/pardom/Ollie
- https://github.com/frankiesardo/ReactiveContent
- https://github.com/evant/rxloader
- https://github.com/vyshane/rex-weather
- https://github.com/ReactiveX/RxNetty
- https://github.com/vbauer/caesar
- https://gist.github.com/ivacf/874dcb476bfc97f4d555
1 rx.android.observables.AndroidObservable
has changed to rx.android.app.AppObservable
(Version 0.24 – January 3rd 2015)