RxJava & Rx Android — Part I

Introduction To RxJava /Rx Android

Data Streams are everywhere if we consider an android mobile device we can create the data stream out of anything (user click events, network calls, data storage, variable changes and even errors can be used to generate data streams)

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. Wikipedia

•RxJava is a Java VM implementation of ReactiveX.

•RxAndroid does not replace rxjava.

•RxAndroid is just a layer on top of RxJava which provides android specific support.

There are two main components in RxJava, if we go through the official documentation or if you have ever implemented RxJava you will come across these two Objects

  • Instances of Observable class
  • Observable observe data streams and emit them to subscribed observers
  • Its the interface of Observer Interface
  • Observers are used to consume the data emitted by the Observables

One Observable can have many observers. Observable only emit data if there is at least one observer subscribed for the data. If there is no subscription observable will not emit data.

Main Observer methods are

  • onNext() -> It is called each time an Observable emits data.
  • onError() -> If any error occur which method is is called.
  • onComplete() -> Observable envokes Observers’s onComplete() method when the emission of data is complete.’

Schedulers are one of the main components in RxJava. They are responsible for performing operations of Observable on different threads. They help to offload the time-consuming onto different threads.

There are different type of Schedulers available in Rx Java

  • Schedulers.io() -> It is a limitless thread pool, for non CPU intensive tasks. For example database interaction, performing network calls, and interacting with the file system. It can be used as shown below:
observable.subscribeOn(Schedulers.io())
  • AndroidSchedulers.MainThread() -> This is the main thread or the UI Thread. This is where user Interaction happens. This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method. It can be used by:
observeOn(AndroidSchedulers.mainThread())
  • Schedulers.newThread() -> Creates a new Thread for each task Scheduled
observable.subscribeOn(Schedulers.newThread())
  • Schedulers.Single() -> This has a single Thread executing tasks one after the other in the given order.
observable.subscribeOn(Schedulers.single())
  • Schedulers.Trampoline() -> This scheduler executes tasks following first in, first out basics. It is best used for implementing recurring tasks. This scheduler runs the code on the current thread. So if you have a code running on the main thread, this scheduler will add the code block on the queue of the main thread. It is quite similar to Immediate Scheduler as it also blocks the thread, however, it waits for the current task to execute completely(while Immediate Scheduler invokes the task right away). Trampoline schedulers come in handy when we have more than one observable and we want them to execute in order.
observable.subscribeOn(Schedulers.trampoline())
  • Schedulers.computation() -> This scheduler is quite similar to IO Schedulers as this is backed by thread pool too. However, the number of threads that can be used is fixed to the number of cores present in the system. So if you have two cores in your mobile, it will have 2 threads in the pool. This also means that if these two threads are busy then the process will have to wait for them to be available. While this limitation makes it a poor fit of IO related things, it is good for performing small calculations and are generally quick to perform the operation. It is available as:
observable.subscribeOn(Schedulers.computation())

These two concepts in Rx Java confuses a lot of people who are a beginner when it comes to the Rx Java. I took my own time and some reading on the internet to understand them.

Let’s add Rx Java to our sample Android Project and see some of the it in Action.

  • Add Rx Java Dependencies
implementation "io.reactivex.rxjava3:rxjava:3.0.5"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

I have taken a Simple empty String and the map operator to print the thread in which the method was called.

observeOn changes the thread on which the future work is supposed to be done. So whatever or task that was there prior to observeOn will be still in the main thread or the thread which might have been specified. Below is a simple COde block which shows this property

Observable.just("")
.map(s -> {
Log.i(TAG, Thread.currentThread().getName());
return s;
}).observeOn(Schedulers.io())
.map(s -> {
Log.e(TAG, Thread.currentThread().getName());
return s;
})
.subscribe(s -> {
Log.d(TAG, Thread.currentThread().getName());
});

Here is the logout as well for better understanding

I/rxmyapp: main
E/rxmyapp: RxCachedThreadScheduler-1
D/rxmyapp: RxCachedThreadScheduler-1

While subcribeOn changes the thread for the prior and the future tasks which can be easily seen with the execution of the code block mentioned below

Observable.just("")
.map(s -> {
//This wil also run on the computation thread
Log.i(TAG, Thread.currentThread().getName());
return s;
}).subscribeOn(Schedulers.computation())
.map(s -> {
//This wil also run on the computation thread
Log.e(TAG, Thread.currentThread().getName());
return s;
})
.subscribe(s -> {
//This wil also run on the computation thread
Log.d(TAG, Thread.currentThread().getName());
});

And the log output for the code block

26228-26799 I/rxmyapp: RxComputationThreadPool-2
26228-26799 E/rxmyapp: RxComputationThreadPool-2
26228-26799 D/rxmyapp: RxComputationThreadPool-2

If we use multiple subcribeOn(), one after the another only the first one would be effective. Only the first usage of subscription will change the thread to respective one, rest will have no effect.

Observable.just("")
.map(s -> {
//This wil also run on the computation thread
Log.i(TAG, Thread.currentThread().getName());
return s;
}).subscribeOn(Schedulers.computation())
.map(s -> {
//This wil also run on the computation thread
Log.e(TAG, Thread.currentThread().getName());
return s;
}).subscribeOn(Schedulers.io())
.map(s -> {
//This wil also run on the computation thread
Log.e(TAG, Thread.currentThread().getName());
return s;
})
.subscribe(s -> {
//This wil also run on the computation thread
Log.d(TAG, Thread.currentThread().getName());
});

On the other hand, observeOn is able to change the threads when called multiple times.

Observable.just("")
.map(s -> {
// This will be the Main Thread
Log.i(TAG, Thread.currentThread().getName());
return s;
}).observeOn(Schedulers.io())
.map(s -> {
// This will be the IO thread
Log.e(TAG, Thread.currentThread().getName());
return s;
})
.observeOn(Schedulers.computation())
.map(s -> {
// This will be the computation thread
Log.e(TAG, Thread.currentThread().getName());
return s;
})
.subscribe(s -> {
//This will be the computation thread
Log.d(TAG, Thread.currentThread().getName());
});

Some points to remember regarding the behaviour of these two

ObserveOn works only downstream

subcribeOn works upstream and downstream

Consecutive subcribeOn do no change the thread

Consecutive observeOn do change the thread

Thread changed by observeOn cannot be overridden by subcribeOn

Next, we will start building some sample application in the coming blogs for this series and will look out how we can build a reactive android application using Rx Java.

There are still a lot of other basic things that need to be covered about Rx Java, hopefully, we will be able to get them in the next article and then move towards building our Wikipedia Search app. Thank you all :).

A Software Engineer and gamer at heart.