Codepath

RxJava

Overview

RxJava is described officially as "a library for composing asynchronous and event-based programs by using observable sequences". But what does this actually mean? Let's put this library into context.

One of the challenges in writing robust Android apps is the dynamic nature of changing inputs. In traditional imperative programming models, values have to be explicitly set on variables for them to be updated. If one dependent value changes, the value will not be updated without adding another line of code. Consider the following example:

// init variables
int i, j, k; 

// Init inputs
i = 1;
j = 2;

// Set output value
k = i + j;

// Update a dependent value
j = 4;
k = ?  // What should k be?

State variables such as k intrinsically may not reflect the current value of its inputs. Traditional asynchronous programming approaches tend to rely on callbacks to update these changes, but this way can lead to a problem known as callback hell. Reactive programming (see an intro here) addresses these issues by providing a framework to describe outputs to reflect their changing inputs. RxJava, which is a port of the Reactive Extensions library from .NET, enables Android apps to be built in this style.

Check out Kaushik's video introduction to RxJava talk which provides a solid overview of the library. For a tutorial, check out this tutorial by Chris Arriola.

Advantages

Here are a few advantages of using RxJava on Android:

  • Simplifies the ability to chain async operations. If you need to make an API call that depends on another API call, you will likely end up implementing this call in the callback of the first one. RxJava provides a way to avoid the need to create layers of callbacks to address this issue. For this reason, RxJava became popular within Netflix in 2014 for abstracting away the complexities of performing dependent API calls.

  • Exposes a more explicit way for declaring how concurrent operations should operate. Although RxJava is single-threaded by default, RxJava helps enable you to define more explicitly what type of threading models should be used for both background and callback tasks. Since Android only allows UI updates on the main thread, using RxJava helps make the code more clear about what operations will be done to update the views.

  • Surfaces errors sooner. One issue with AsyncTask is that errors that occur on the background thread are hard to pass along when updating the UI thread using the onPostExecute() method. In addition, there are limitations in how many AsyncTasks can be dispatched concurrently as described in this blog post. RxJava provides a way to enable these errors to be surfaced.

  • Helps reduce the need for state variables that can introduce bugs. One mindset shift need in using RxJava is thinking about everything in terms of describing how data flows in the system. Click events generated by the user, network calls, data updates from external sources all can all be described as asynchronous streams of data. The power of RxJava is that it enables these streams to be transformed, filtered, or used to create new streams of data with only a few lines of code while also minimizing the need for storing state variables.

Setup

Setup your app/build.gradle:

dependencies {
  implementation 'io.reactivex:rxandroid:1.2.0'
  implementation 'io.reactivex:rxjava:1.1.4'
}

If you plan to use RxJava, it is highly recommended that you setup lambda expressions to reduce the verbose syntax usually needed. The guide includes instructions about how to update your existing Android Studio projects to leverage it.

Observables and Observers

The basic building blocks of reactive code are Observables and Observers. An Observable emits items; an Observer consumes those items. An Observable may emit any number of items (including zero items), then it terminates either by successfully completing, or due to an error.

An Observable can then have any number of observers. For each Observer attached, an Observable calls Observer#onNext() for every item, followed by either Observer#onComplete() or Observer#onError(). Keep in mind that Observables often don't start emitting items until there's at least one subscriber.

Defining Observables

Let's take the most basic example to understand how this is structured. First, let's define an Observable which is an object that can emit any number of items to be processed:

// Observables emit any number of items to be processed
// The type of the item to be processed needs to be specified as a "generic type"
// In this case, the item type is `String`
Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            // "Emit" any data to the subscriber
            sub.onNext("a");
            sub.onNext("b");
            sub.onNext("c");
            // Trigger the completion of the event
            sub.onCompleted();
        }
    }
);

This observable event emits the data "a", "b", "c" and then completes.

Defining Observers

Now let's create a Observer to consume this emitted data from the Observable:

Observer<String> mySubscriber = new Observer<String>() {
    // Triggered for each emitted value
    @Override
    public void onNext(String s) { System.out.println("onNext: " + s); }

    // Triggered once the observable is complete
    @Override
    public void onCompleted() { System.out.println("done!"); }

    // Triggered if there is any errors during the event
    @Override
    public void onError(Throwable e) { }
};

Subscribing to Observables

To implement an observer for these events, the following interface must be defined:

public interface Observer<T> {
    void onNext(T t); // called for each "emitted" item
    void onCompleted(); // will not be called if onError() is called
    void onError(Throwable e); // called if there's an error
}

Note that an Observer is a generic type. It must represent the type of value that the Observable will emit. For a subscriber to start watching an observable that will generate string types, it must subscribe to it:

Observable.just("a", "b", "c").subscribe(new Observer<String>() {
    // Triggered for each emitted value
    // Invoked with "a", then "b", then "c"
    @Override
    public void onNext(String s) { System.out.println("onNext: " + s); }

    // Triggered once the observable is complete
    @Override
    public void onCompleted() { System.out.println("done!"); }

    // Triggered if there is any errors during the event
    @Override
    public void onError(Throwable e) { }
});

This example above would simply print each argument ("a", "b", "c") and then exit since each item is invoked with a call to onNext. Once all items have been invoked, the onCompleted method is called.

This might seem contrived and not particularly useful in this example but as layers are built on top of these foundations, we begin to see the power of RxJava.

An Observer can be attached to an Observable in order to respond to emitted data with:

// Attaches the subscriber above to the observable object
myObservable.subscribe(mySubscriber);
// Outputs:
// onNext: "a"
// onNext: "b"
// onNext: "c"
// done!

This example would simply print each emitted item and then exit since each item is invoked with a call to onNext. Once all items have been invoked, the onCompleted method of the Observer is called.

Other ways to create Observables

As demonstrated above, an Observer watches for result values emitted by the Observable. When these events occur, the role of the subscriber is to respond to these events. An Observable can be created from any type of input.

Just about anything to be considered an asynchronous stream of data. For instance, an Observable can be a set of string items that should be iterated:

// `just` generates an observable object that emits each letter and then completes the stream
Observable.just("a", "b", "c");

You can create existing arrays as well:

ArrayList<String> items = new ArrayList();
items.add("red");
items.add("orange");
items.add("yellow");

Observable.from(items);

If you are trying to convert internal code into Observables, consider using Observable.defer() instead, especially for code that may cause your code to block the thread:

private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.just(slowBlockingMethod()); // slow blocking method will be invoked when created
}

Instead, we should use defer() to create the observable going:

public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

See this blog post for more details. For a complete list of the different ways of creating an Observable, check out this link.

Schedulers

RxJava is synchronous by default, but work can be defined asynchronously using schedulers. For instance, we can define that the network call should be done on a background thread, but the callback should be done on the main UI thread.

Observable.from(Arrays.asList(1,2,3,4,5))
	.subscribeOn(Schedulers.newThread())
	.observeOn(AndroidSchedulers.mainThread()))
	.subscribe(new Subscriber<String>() {
		@Override
		public void onCompleted() {
			//called on completion
		}
		
		@Override
		public void onError(final Throwable e) {
			//called when error occurs
		}
		
		@Override
		public void onNext(final String s) {
			Log.d("emit", s);
		}
	});

Here are the things to note from the example code above:

  • subscribeOn(Schedulers.newThread()): This will make the Observable do its background work in a new thread
  • .observeOn(AndroidSchedulers.mainThread())): This makes the subscriber action to execute its result on Android's main UI thread. This is very important especially when change to a UI component needs to be made based on the result.
  • .subscribe(): Subscribes an Observer to the Observable. The Observers onNext method is called on each emitted item, followed by either onCompletion if it runs successfully or onError if an error occurs.

Using schedulers relies on queuing the work through bounded or unbounded thread pools. Here are a few options available that come with RxJava. See this link for all the possible options.

Name Description
Schedulers.computation() fixed number of threads (= to # CPU's)
Schedulers.immediate() current thread
Schedulers.io() backed by a thread pool that can expand as needed
Schedulers.newThread() create a new thread
Schedulers.trampoline() schedule work on the current thread but put on a queue

These schedulers can then be used to control which thread an observable or the subscriber are operating on using subscribeOn() and observeOn().

Schedulers come out of the box with RxJava. The RxAndroid library comes with AndroidSchedulers.mainThread(), which is a convenient way to access the main UI thread on Android.

Replacing AsyncTask with Observables

We can replace any AsyncTask calls with RxJava calls instead using Observable. Similar to how AsyncTask performs the task in the background and then calls onPostExecute() on the main thread on the UI, RxJava can accomplish this same function by defining which thread to perform the task with subscribeOn(), and then where to define the callback thread with observeOn():

// This constructs an `Observable` to download the image
public Observable<Bitmap> getImageNetworkCall() {
    // Insert network call here!
}

// Construct the observable and use `subscribeOn` and `observeOn`
// This controls which threads are used for processing and observing
Subscription subscription = getImageNetworkCall()
    // Specify the `Scheduler` on which an Observable will operate
    .subscribeOn(Schedulers.io()) 
    // Specify the `Scheduler` on which a subscriber will observe this `Observable`
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Subscriber<Bitmap>() {

        // This replaces `onPostExecute(Bitmap bitmap)`
        @Override
        public void onNext(Bitmap bitmap) {
             // Handle result of network request
        }

        @Override
        public void onCompleted() {
             // Update user interface if needed
        }

        @Override
        public void onError() {
             // Update user interface to handle error
        }

});

Using this combination of Observable, Subscriber and thread scheduling, we can see the power of RxJava. But there's a lot more that can be done.

Using RxJava with Retrofit

RxJava can be used with Retrofit to provide the ability to chain multiple network calls. It also provides a basic abstraction for surfacing errors if one of these network calls should fail. The section below shows you how to setup Retrofit with RxJava.

The Retrofit library simply wraps a synchronous network call as an Observable type for use with RxJava. Declaring the endpoints as Observable automatically does this work.

public interface MyApiEndpointInterface {
  @GET("/users/{username}")
  Observable<User> getUser(@Path("username") String username);
}

We can then instantiate an instance of this interface and get back an Observable type:

MyApiEndpointInterface apiService =
    retrofit.create(MyApiEndpointInterface.class);

// Get the observable User object
Observable<User> call = apiService.getUser(username);
// To define where the work is done, we can use `observeOn()` with Retrofit
// This means the result is handed to the subscriber on the main thread
call.subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<User>() {
    @Override
    public void onNext(User user) {
       // Called once the `User` object is available
    }

    @Override
    public void onCompleted() {
      // Nothing to do here
    }

    @Override
    public void onError(Throwable e) {
      // cast to retrofit.HttpException to get the response code
      if (e instanceof HttpException) {
         HttpException response;
         int code = response.code();
      }
    }
  });

The RxAndroid library includes AndroidSchedulers.mainThread() for allowing callbacks to be fired on the main UI thread.

Hot vs. Cold Observables

By default, Observables are initialized to begin executing after the first subscriber is attached. Retrofit, for instance, by default operates in this way, which are known as cold observables. You can take a look at the Retrofit source code to see that the network request is made on the first subscription.

Cold to Hot Observables

If you wish to change it so that multiple subscribers are attached before executing the request, otherwise known as converting to a hot observable, you need to convert the Observable to an ConnectableObservable. To initiate the network request, you need to call connect() on the observable:

Observable<User> call = apiService.getUser(username);

// convert Observable to ConnectedObservable, get a reference so we can call connect() later
ConnectableObservable<User> connectedObservable = call.publish();

// define 1st observer
Observer<User> observer1 = new Observer<User>() {
   @Override
   public void onCompleted() {

   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(User user) {
      // do work here
   }
};        

// define 2nd observer here
Observer<User> observer2 = new Observer<User>() { 
}

// observer is subscribing
connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer1);
connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer2);

// initiate the network request
connectedObservable.connect();

Hot to Cold Observables

You can also turn a hot observable back to a cold observable by using autoConnect(). Instead of needing to call an explicit connect() and passing around ConnectedObservable types, you can use this approach to enable the next subscriber to trigger a network request upon the next subscription:

// back to cold observable
Observable<User> observable = connectedObservable.autoConnect();  

// define 3rd observer here
Observer<User> observer3 = new Observer<User>() { 
}

observable.subscribe(observer3);

Handling Configuration Changes

If an Activity is destroyed because of a change in screen orientation, the Observable will fire again in onCreate() and this will result in duplication of work because the same API call will be made again and all progress made initially will be lost. To solve this problem, you need to first store a reference to the Observable by using singletons or using retained fragments and use the cache() operator:

Observable<User> call = apiService.getUser(username).cache();

Future observers will get the previous emitted items from the Observables as soon as it subscribes:

Subscription subscription = observable.subscribe(observer1);

If you wish to have more granular control over the buffer size or time span in which cached events will be emitted, you should use the replay() operator:

ConnectableObservable<User> call = apiService.getUser(username).replay(10, TimeUnit.MINUTES);

Also note that the replay() turns the observable into a hot observable, which will not emit events until connect() is called.

Avoiding Memory Leaks

The flexibility in being able to schedule observables on different threads and have them operate as long-running tasks can make it easy to contribute the memory leaks. One of the reasons is that observers are often created with anonymous classes. In Java, creating anonymous classes requires the inner class retaining an instance to the containing class as discussed in this Stack Overflow article.

An observer that is created within an Activity or Fragment therefore can hold a reference that will be unable to be garbage collected if the observable is still running. There are several different approaches suggested. Both approaches attempt to manage the subscriptions created from attaching an observer to an observable and canceling them when a lifecycle event occurs.

Composite Subscriptions

One of the simplest approach is to simply instantiate a CompositeSubscription object inside your Activity or Fragment.

public class MainActivity extends AppCompatActivity {

    private CompositeSubscription subscriptions;

    protected void onCreate(Bundle savedInstanceState) {
       subscriptions = new CompositeSubscription();
    }
}

We can then use CompositeSubscription to track any subscriptions created by using the add() method:

subscriptions.add(connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer1))
subscriptions.add(connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer1));

Canceling these subscriptions should occur during the onPause() method when the Activity or Fragment is suspended:

@Override
public void onPause() {
   super.onPause();

   if (subscriptions != null) {
      subscriptions.clear();
   }
}
Once a clear() call is made, the CompositeSubscription can be reused.

RxBinding

See this guide on how to implement RxJava with Android views. The RxBinding library simply implements Observable events for many standard Android views, making it easier to turn your UI events to leverage reactive-based programming.

RxLifecycle

There is also a library called RxLifecycle which provides support for managing the lifecycle of activities and fragments. In the past RxAndroid provided this support with AndroidObservables, but a decision was made to simplify the library. See this release note for more context.

To setup, these Gradle lines must be added:

implementation 'com.trello:rxlifecycle:0.4.0'
implementation 'com.trello:rxlifecycle-components:0.4.0'

RxLifecycle requires subclassing all activities with RxActivity or RxAppCompatActivity.

Chaining Observables

For a better understanding about how subscriptions can be chained and how RxJava works in general, it's best to first to understand what happens beneath the surfaces when this subscribe() call is made. Beneath the covers Subscriber objects are created. If we wish to chain the input, there are various operators that are available that map one Subscriber type to another.

For more context, watch this video talk.

References

Fork me on GitHub