先日、TwitterSDKのTwitterApiClientについてちょっとしたTipsを書こうと思い、サンプルアプリを作り始めました。当初はシンプルなものにする予定だったのですが、RxJavaを勉強中ということもあり、RxJavaで再帰処理を実装してみることにしました。この実装についてAndroidのSlackチームであるandroid-jpにて相談したところ、@hydrakecatさんより素晴らしいフィードバックをいただくことができました。
Slackでせっかく得られた知見は流れてしまうので、ここにまとめておきます。最終的な成果物だけ興味ある方は一番下の実装見ていただくとよいです。
tl;dr
- 再帰処理はSubjectを使うと効率よく書けるよ
- android-jp は最先端のAndroid開発に関する集合知
実装しようとしていたこと
元々個人のアプリでツイートをAPIの限界(3000件)まで取ってくるという処理を再帰処理で実装していました。仕組みとしては
- ツイートデータ200件(一度に取得できる最大件数)をTwitterApiClientで取得しDBに格納
- 一番古いidを引数として次の200件を再帰的に取得
- 一番古いidが前回のレスポンスと同じだったら終了
というものです。 いわゆる再帰処理となるわけですが、RxJavaで実装したらどうなるんだろと思い、実装を始めました。なお、自分のRxJavaレベルは仕事ではリスト処理周りと、簡単な非同期処理やバリデーションで利用していますが、API周りでは使っておらず、エラーハンドリングやObservableの複雑なハンドリングは勉強中といったところです。
最初の実装
最初の実装は以下のような考えで実装しました。
(1) TwitterApiClientから返ってきたツイートデータをObservableにする
(2) (1)のObservableから一番古いidをResultクラスに格納し、再度Observableを作る
(3) (2)のObservableをsubscribeして一番古いidがあれば再度(1)のObservable処理を実行する
これを実装したのが以下になります。
//(1) TwitterApiClientから返ってきたツイートデータをObservableにする public Observable<Observable<List<TweetData>>> fetchMultipleTweets(long sinceId, long maxId) { CustomTwitterApiClient client = CustomTwitterApiClient.getInstance(); final Long tempMaxId = maxId == -1L ? null : maxId; final Long tempSinceId = sinceId == -1L ? null: sinceId; return Observable.create(subscriber -> { try { subscriber.onNext(client.getCustomStatusesService() .userTimeline(null,userName,200,tempSinceId,tempMaxId,false,false,false,true) .flatMap(Observable::from) .map(tweet -> { TweetData data = new TweetData(); data.setMessage(tweet.text); data.setId(tweet.id); data.setName(tweet.user.screenName); return data; }).toList()); subscriber.onCompleted(); } catch (Exception e){ subscriber.onError(e); } }); } public void fetchAllTweets(long sinceId, long maxId) { //(2) (1)のObservableの結果(一番古いid)を作るObservableを作る Observable<Result> observable = Observable.create(subscriber -> { fetchMultipleTweets(sinceId, maxId).subscribe(new Subscriber<Observable<List<TweetData>>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { subscriber.onError(e); subscriber.onNext(new Result(ResultCode.ERROR, e.toString(), -1, -1)); subscriber.onCompleted(); } @Override public void onNext(Observable<List<TweetData>> tweetDataListObservable) { tweetDataListObservable.isEmpty().subscribe(isEmpty -> { if (isEmpty) { subscriber.onCompleted(); } else { tweetDataListObservable .observeOn(Schedulers.from(threadPoolExecutor)) .subscribe(TweetLoader::putTweetDataList); //最後のid取得 tweetDataListObservable.flatMap(Observable::from) .last().subscribe(tweetData -> { long nextMaxId = tweetData.getId(); if (nextMaxId == maxId) { subscriber.onCompleted(); } else { subscriber.onNext(new Result(ResultCode.SUCCESS, "", sinceId, nextMaxId)); } }); } }); } }); }); //(3) (2)のObservableをsubscribeして一番古いidがあれば再度(1)のObservable処理を実行する observable .subscribeOn(Schedulers.newThread()) .subscribe(new Subscriber<Result>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Timber.e("¥¥ error " + e.getMessage()); } @Override public void onNext(Result result) { if(ResultCode.ERROR.equals(result.resultCode)) { onCompleted(); return; } fetchAllTweets(result.sinceId, result.maxId); } }); }
Slackのandroid-jpのrxjavaチャンネルに投下
一応動くのですが、なんだか良い書き方ではないと直感的に感じていました。川が途中で途切れるし、最初のObservableのネストが尋常じゃなく深いし。で、twitterでもにょもにょしていたところ、@hydrakecatさんにメンションいただいたので、android-jp にpostしました。 そこでいただいたレビューとしては
- onErrorのハンドリングが足りないのでクラッシュする可能性がある
- TwitterApiClientをObservableを生成する度に呼び出しているので、効率が良くない
というものでした。言われてみれば確かにそうです。ベターな実装としては
- OnSubscribe は1つだけにして、内部で synchronized なネットワークコールを何回も叩いて全件取得する
- あるいは、一番古いidをemitするSubjectを用意して、そのツイートを取得するたびに、そのonNextを叩く
という2つをいただきました。
レビュー後の実装
@hydrakecatさんが書いたサンプルコードをベースに書き換えたのが以下です。
public void fetchAllTweets(long sinceId, long maxId) { final CustomTwitterApiClient client = CustomTwitterApiClient.getInstance(); //maxId を emit する Subject を用意して、そのツイートを取得するたびに、その onNext を叩く final PublishSubject<Long> maxIdSubject = PublishSubject.create(); maxIdSubject .flatMap(mid -> { final Long tempMaxId = mid == -1L ? null : mid; final Long tempSinceId = sinceId == -1L ? null : sinceId; return client.getCustomStatusesService() .userTimeline(null,userName,200, tempSinceId,tempMaxId, false,false,false,true) .flatMap(Observable::from) .map(tweet -> { TweetData data = new TweetData(); data.setMessage(tweet.text); data.setId(tweet.id); data.setName(tweet.user.screenName); return data; }).toList().map(list -> new Pair<>(mid, list)); }) .doOnError(CrashOnError.crashOnError()) .observeOn(Schedulers.from(threadPoolExecutor)) .subscribe( p -> { TweetLoader.putTweetDataList(p.second); if (p.second.isEmpty()) { maxIdSubject.onCompleted(); } else { final long nextMaxId = p.second.get(p.second.size() - 1).getId(); if (nextMaxId == p.first) { maxIdSubject.onCompleted(); } else { maxIdSubject.onNext(nextMaxId); } } }, e -> Timber.d("¥¥ error: " + e.getMessage()), () -> Timber.d("¥¥ fetching completed") ); maxIdSubject.onNext(maxId); }
ポイントとしては
- Subjectでidを受け渡すことで、一連のフローが大変簡潔になっている
- Client生成が初回時だけなので効率的
- 実行スレッドを明示的に制御したかったため、
observeOn
でthreadPoolExecutor
を噛ませていますが、ここはScheduler.io()
でもよいかも。 doOnError()
については、maxIdSubject.flatMap内でのエラーで行数がなぜか表示されないためにエラーをキャッチする仕組みをいれてみました。通常はsubscribe
内のonError()
でひろえるかなと。詳しくはサンプルコードをみてもらうと良いです。
というところです。
しかし、まじで @hydrakecat さんのSubjectの実装美しい。
サンプル・コード
こちらにあります。
https://github.com/tomoima525/FetchTweet
まとめ
新しい技術を学んでいると、まだデファクトスタンダードな実装がなかったり、資料も十分ないケースがあるので、所属など関係なく議論できる場は最高ですね。