22 C
Bangalore
October 14, 2018
Untitled

Death of AsyncTasks: A Short introduction to RxJava

When AsyncTask was introduced to Android, it was described as “Painless Threading.” Its goal is to perform background operations and publish results on the UI thread without having to manipulate threads and/or handlers. It was successful to some extent, but not exactly painless. Below are a number of cases where AsyncTask is a pain, when using it without fully understanding.

  • AsyncTask and Activity Lifecycle
  • Cancelling AsyncTasks
  • Limit on Concurrent AsyncTasks

Introduction

Reactive programming, RxJava and RxAndroid have become a familiar name within the Android app development world. RxAndroid has a steep learning curve associated with it. To be honest with you, at first it appeared a little bit overwhelming and strange. The way i dealt with it is dive into coding, look at examples and eventually it all start to make sense.

The aim of this article is to serve as an introduction for developers not familiar with this concept and also encourage them to start using RxJava and consume the benefits.

Reactive programming concepts

Reactive programming is an extension of the Observer design pattern. There are two basic and very important items in reactive programming,

Observables publish values, while Observers subscribe to Observables, watching them and reacting to it. In RxJava/RxAndroid, observers are instances of the Observer interface, and observables are instances of the Observable class. If a new item is emitted from the observable, the onNext() method is called on observer. If the observable finishes its data flow successfully, the onComplete() method is called. Similarly, if the observable finishes its data flow with an error, the onError() method is called.

Another component in RxJava is Schedulers, we will talk about it while walking through code.

Lets jump into code. Code can be found here: https://github.com/jacksvarghese86/RxJavaIntro. Example code is written using RxJava2 & RxAndroid2

Example

Lets create a method which takes long time to complete.

private ArrayList<Integer> fetchLargeData() {
    Log.d("RxJava", "fetchLargeData: "+Thread.currentThread().getName());
    ArrayList<Integer> data = new ArrayList<>();
    data.add(1);
    data.add(2);
    data.add(3);
    data.add(4);
    data.add(5);
    //Adding a loop to simulate long running operation.
    long count = 0;
    while (count &amp;lt; 9999999999l) {
        ++count;
    }
    return data;
}

A loop is added to simulate a long running operation. Call this method inside a button click.

If this method is called directly it would create ANR since it takes more than 5 secs to complete the operation. Traditionally we would use an async task to call the method in background and publish results on main thread. For a change we are going to use RxJava here.

Steps required:

  • Create an Observable
  • Create an Observer
  • Subscribe – Connect Observable and Observer
  • Use Schedulers if required
  • UnSubscribe

Create an Observable

Create an Observable from the entity we are interested in, which is the return value of fetchLargeData().

The Observable class has many static methods, called operators, to create Observable objects. The following code shows you how to use the fromCallable() operator to create a very simple Observable that emits an ArrayList of Integers.

Observable<ArrayList<Integer>> aSyncDataObservable = Observable.fromCallable(new Callable<ArrayList<Integer>>() {
    @Override
    public ArrayList<Integer> call() throws Exception {
        return fetchLargeData();
    }
});

There are bunch of other operators to create Observable object. Most used one in tutorials are ‘Observable.just()’ operator. just() will be evaluated immediately and block the UI thread, and does not serve our purpose. So we use Observable.fromCallable() method. It gives two advantages:

  • The code for creating the values to be published are not run until an Observer subscribes to the Observable.
  • The code can be run on a different thread.

Create an Observer

Since Observable is ready, let’s create an observer. Create an instance of Observer class, and implement the methods.

Observer<ArrayList<Integer>> dataObserver = new Observer<ArrayList<Integer>>() {
    @Override
    public void onSubscribe(Disposable d) {
        compositeDisposable.add(d);
        Toast.makeText(getApplicationContext(), "onSubscribe", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onNext(ArrayList<Integer> integers) {
        Toast.makeText(getApplicationContext(), "onNext: " + integers.size(), Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(getApplicationContext(), "onError", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onComplete() {
        Log.d("RxJava", "onComplete: " + Thread.currentThread().getName());
        Toast.makeText(getApplicationContext(), "onComplete: " + Thread.currentThread().getName(), Toast.LENGTH_SHORT).show();
    }
};

Subscribe – Connect Observable and Observer

Observable and Observer is connected through subscribe() method.

aSyncDataObservable.subscribe(dataObserver);

As soons as subscribe() is called onSubscribe() of Observer is called, and Observable will make call to fetchLargeData().

The catch here is, all the above operations happens on main UI thread, and we end up in application not responding scenario. How do we solve that – here comes schedulers.

Use Schedulers

To use Schedulers we have to use another operator called subscribeOn()susbcribeOn() modifies the observable and tells RxJava framework to run the code, including the code the gets run upon subscription, on a different thread. This means the logic in our Callable object, including the call to fetchLargeData(), will run on a different thread. So we modified our code like this:

aSyncDataObservable
        .subscribeOn(Schedulers.computation())
        .subscribe(dataObserver);

For the time being assume schedulers as separate thread. We use Schedulers.computation(). When using RxJava we have to use the schedulers provided by RxJava. There are bunch of other schedulers available, for our purpose Schedulers.computation() is enough.

Since our Observable is set to run on the Computation Scheduler, it’s going to interact with our Observer on the Computation Scheduler. It means our onNext() will be called on the Computation Scheduler. We cannot show Toast on a non UI thread and app will crash.

We can tell RxJava framework to run Observer code on UI thread by simply using observeOn() operator. Our final code will be like this.

aSyncDataObservable
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(dataObserver);

UnSubscribe

Everytime an Observer is subscribed to an Observable we get a subscription instance. This need to removed when no longer required. This is where RxJava have definite advantage over AsyncTask. We can unsubscribe anytime required and no longer have to worry about state of UI.

More Learning

  • Play around with different Schedulers
  • Use just operator to fetch large data
  • Use just operator to fetch large data using Schedulers
  • Learn more RxJava operators map(), flatMap(), fromArray() etc
  • Combine multiple operators

What Next

Using RxJava and Databinding we can build really nice MVVM patterns for Android app development. Thats next..

Related posts

1 comment

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.