tomoima525's blog

Androidとか技術とかその他気になったことを書いているブログ。世界の秘密はカレーの中にある!サンフランシスコから発信中。

RxJavaで再帰処理をする、あるいは集合知は素晴らしいという話

f:id:tomoima525:20160424160637p:plain:w400
先日、TwitterSDKのTwitterApiClientについてちょっとしたTipsを書こうと思い、サンプルアプリを作り始めました。当初はシンプルなものにする予定だったのですが、RxJavaを勉強中ということもあり、RxJavaで再帰処理を実装してみることにしました。この実装についてAndroidのSlackチームであるandroid-jpにて相談したところ、@hydrakecatさんより素晴らしいフィードバックをいただくことができました。
Slackでせっかく得られた知見は流れてしまうので、ここにまとめておきます。最終的な成果物だけ興味ある方は一番下の実装見ていただくとよいです。

tl;dr

実装しようとしていたこと

元々個人のアプリでツイートをAPIの限界(3000件)まで取ってくるという処理を再帰処理で実装していました。仕組みとしては

  • ツイートデータ200件(一度に取得できる最大件数)をTwitterApiClientで取得しDBに格納
  • 一番古いidを引数として次の200件を再帰的に取得
  • 一番古いidが前回のレスポンスと同じだったら終了

というものです。 いわゆる再帰処理となるわけですが、RxJavaで実装したらどうなるんだろと思い、実装を始めました。なお、自分のRxJavaレベルは仕事ではリスト処理周りと、簡単な非同期処理やバリデーションで利用していますが、API周りでは使っておらず、エラーハンドリングやObservableの複雑なハンドリングは勉強中といったところです。

最初の実装

最初の実装は以下のような考えで実装しました。
(1) TwitterApiClientから返ってきたツイートデータをObservableにする
(2) (1)のObservableから一番古いidをResultクラスに格納し、再度Observableを作る
(3) (2)のObservableをsubscribeして一番古いidがあれば再度(1)のObservable処理を実行する

これを実装したのが以下になります。

//(1) TwitterApiClientから返ってきたツイートデータをObservableにする
public Observable<Observable<List<TweetData>>> fetchMultipleTweets(long sinceId, long maxId) {
     CustomTwitterApiClient client = CustomTwitterApiClient.getInstance();
     final Long tempMaxId = maxId == -1L ? null : maxId;
     final Long tempSinceId = sinceId == -1L ? null: sinceId;
     return Observable.create(subscriber -> {
         try {
             subscriber.onNext(client.getCustomStatusesService()
                     .userTimeline(null,userName,200,tempSinceId,tempMaxId,false,false,false,true)
                     .flatMap(Observable::from)
                     .map(tweet -> {
                         TweetData data = new TweetData();
                         data.setMessage(tweet.text);
                         data.setId(tweet.id);
                         data.setName(tweet.user.screenName);
                         return data;
                     }).toList());
             subscriber.onCompleted();
         } catch (Exception e){
             subscriber.onError(e);
          }
      });
}


public void fetchAllTweets(long sinceId, long maxId) {
    
    //(2) (1)のObservableの結果(一番古いid)を作るObservableを作る
    Observable<Result> observable = Observable.create(subscriber -> {
        fetchMultipleTweets(sinceId, maxId).subscribe(new Subscriber<Observable<List<TweetData>>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
                subscriber.onNext(new Result(ResultCode.ERROR, e.toString(), -1, -1));
                subscriber.onCompleted();
            }

            @Override
            public void onNext(Observable<List<TweetData>> tweetDataListObservable) {
                tweetDataListObservable.isEmpty().subscribe(isEmpty -> {
                    if (isEmpty) {
                        subscriber.onCompleted();
                    } else {
                        tweetDataListObservable
                                .observeOn(Schedulers.from(threadPoolExecutor))
                                .subscribe(TweetLoader::putTweetDataList);

                        //最後のid取得
                        tweetDataListObservable.flatMap(Observable::from)
                                .last().subscribe(tweetData -> {
                            long nextMaxId = tweetData.getId();
                            if (nextMaxId == maxId) {
                                subscriber.onCompleted();
                            } else {
                                subscriber.onNext(new Result(ResultCode.SUCCESS, "", sinceId, nextMaxId));
                            }
                        });
                    }
                });
            }
        });
    });

    //(3) (2)のObservableをsubscribeして一番古いidがあれば再度(1)のObservable処理を実行する
    observable
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Subscriber<Result>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
            Timber.e("¥¥ error " + e.getMessage());
        }

        @Override
        public void onNext(Result result) {
            if(ResultCode.ERROR.equals(result.resultCode)) {
                onCompleted();
                return;
            }
            fetchAllTweets(result.sinceId, result.maxId);
        }
    });
}

Slackのandroid-jpのrxjavaチャンネルに投下

一応動くのですが、なんだか良い書き方ではないと直感的に感じていました。川が途中で途切れるし、最初のObservableのネストが尋常じゃなく深いし。で、twitterでもにょもにょしていたところ、@hydrakecatさんにメンションいただいたので、android-jp にpostしました。 そこでいただいたレビューとしては

  • onErrorのハンドリングが足りないのでクラッシュする可能性がある
  • TwitterApiClientをObservableを生成する度に呼び出しているので、効率が良くない

というものでした。言われてみれば確かにそうです。ベターな実装としては

  • OnSubscribe は1つだけにして、内部で synchronized なネットワークコールを何回も叩いて全件取得する
  • あるいは、一番古いidをemitするSubjectを用意して、そのツイートを取得するたびに、そのonNextを叩く

という2つをいただきました。

レビュー後の実装

@hydrakecatさんが書いたサンプルコードをベースに書き換えたのが以下です。

public void fetchAllTweets(long sinceId, long maxId) {
    final CustomTwitterApiClient client = CustomTwitterApiClient.getInstance();
    //maxId を emit する Subject を用意して、そのツイートを取得するたびに、その onNext を叩く
    final PublishSubject<Long> maxIdSubject = PublishSubject.create();
    maxIdSubject
            .flatMap(mid -> {
                final Long tempMaxId = mid == -1L ? null : mid;
                final Long tempSinceId = sinceId == -1L ? null : sinceId;
                return client.getCustomStatusesService()
                        .userTimeline(null,userName,200,
                                tempSinceId,tempMaxId,
                                false,false,false,true)
                        .flatMap(Observable::from)
                        .map(tweet -> {
                            TweetData data = new TweetData();
                            data.setMessage(tweet.text);
                            data.setId(tweet.id);
                            data.setName(tweet.user.screenName);
                            return data;
                        }).toList().map(list -> new Pair<>(mid, list));
            })
            .doOnError(CrashOnError.crashOnError())
            .observeOn(Schedulers.from(threadPoolExecutor))
            .subscribe(
                    p -> {
                        TweetLoader.putTweetDataList(p.second);
                        if (p.second.isEmpty()) {
                            maxIdSubject.onCompleted();
                        } else {
                            final long nextMaxId = p.second.get(p.second.size() - 1).getId();
                            if (nextMaxId == p.first) {
                                maxIdSubject.onCompleted();
                            } else {
                                maxIdSubject.onNext(nextMaxId);
                            }
                        }
                    },
                    e -> Timber.d("¥¥ error: " + e.getMessage()),
                    () -> Timber.d("¥¥ fetching completed")
            );
    maxIdSubject.onNext(maxId);
}

ポイントとしては

  • Subjectでidを受け渡すことで、一連のフローが大変簡潔になっている
  • Client生成が初回時だけなので効率的
  • 実行スレッドを明示的に制御したかったため、observeOnthreadPoolExecutorを噛ませていますが、ここはScheduler.io()でもよいかも。
  • doOnError()については、maxIdSubject.flatMap内でのエラーで行数がなぜか表示されないためにエラーをキャッチする仕組みをいれてみました。通常はsubscribe内のonError()でひろえるかなと。詳しくはサンプルコードをみてもらうと良いです。

というところです。
しかし、まじで @hydrakecat さんのSubjectの実装美しい。

サンプル・コード

こちらにあります。

https://github.com/tomoima525/FetchTweet

まとめ

新しい技術を学んでいると、まだデファクトスタンダードな実装がなかったり、資料も十分ないケースがあるので、所属など関係なく議論できる場は最高ですね。