SoFunction
Updated on 2025-04-06

Getting Started with RxJava Guide and its Examples for Using in Android Development

RxJava's GitHub homepage, there is nothing to say about the deployment part~
/ReactiveX/RxJava

Base
The two most core things in RxJava are Observables (observer, event source) and Subscribers (observer). Observables emits a series of events that Subscribers handle. The events here can be anything you are interested in (touch events, data returned by web interface calls...)

An Observable can issue zero or more events, knowing the end or an error. Every time an event is issued, its Subscriber's onNext method will be called, and finally () or () ends.

Rxjava looks like designing the observer pattern in the pattern, but there is a clear difference, that is, if an Observerble does not have any Subscriber, then the Observable will not emit any events.

Hello World
Creating an Observable object is simple, just call it directly

Observable<String> myObservable = ( 
  new <String>() { 
    @Override 
    public void call(Subscriber<? super String> sub) { 
      ("Hello, world!"); 
      (); 
    } 
  } 
); 


The Observable object defined here only emits a Hello World string and then ends. Next we create a Subscriber to process the strings emitted by the Observable object.

Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { (s); } 
 
  @Override 
  public void onCompleted() { } 
 
  @Override 
  public void onError(Throwable e) { } 
}; 

Here, subscriber is just to print the string emitted by observable. Through the subscribe function, we can associate the myObservable object defined with the mySubscriber object, thus completing the subscriber subscription to observable.

(mySubscriber); 

Once mySubscriber subscribes to myObservable, myObservable calls the onNext and onComplete methods of the mySubscriber object, and mySubscriber will print out Hello World!

A cleaner code
Do you think it's too long to write so much code just to print a hello world? I mainly use this more verbose writing method to show the principles behind RxJava. RxJava actually provides many convenient functions to help us reduce our code.

First, let’s take a look at how to simplify the creation process of Observable objects. RxJava has built-in many functions that simplify creating Observable objects, such as to create an Observable object that only emits an event and ends. The code that creates an Observable object can be simplified into one line.

Observable<String> myObservable = ("Hello, world!"); 
Next, let’s take a look at how to simplify Subscriber. In the example above, we don’t actually care about OnComplete and OnError. We only need to do some processing when onNext, and we can use Action1 class at this time.

Action1<String> onNextAction = new Action1<String>() { 
  @Override 
  public void call(String s) { 
    (s); 
  } 
}; 

There is an overloaded version of the subscribe method that accepts three Action1-type parameters, corresponding to OnNext, OnComplete, and OnError functions.

(onNextAction, onErrorAction, onCompleteAction); 

We don't care about onError and onComplete here, so we only need the first parameter.

 
(onNextAction); 
// Outputs "Hello, world!" 

The above code can eventually be written like this

("Hello, world!") 
  .subscribe(new Action1<String>() { 
    @Override 
    public void call(String s) { 
       (s); 
    } 
  }); 

Using java8's lambda can make the code simpler
 

("Hello, world!") 
  .subscribe(s -> (s)); 


In Android development, it is highly recommended to use the gradle plugin for retrolambda so that you can use lambda in your code.

Transform
Let's do something more interesting!
For example, if I want to add my signature to hello world, you may think of modifying the Observable object:

("Hello, world! -Dan") 
  .subscribe(s -> (s)); 

If you can change the Observable object, this is certainly OK, but what if you can't modify the Observable object? For example, is the Observable object provided by a third-party library? For example, my Observable object is subscribed by multiple Subscribers, but I just want to modify a subscriber?
So how about modifying events in Subscriber? For example, the following code:

("Hello, world!") 
  .subscribe(s -> (s + " -Dan")); 

This method is still unsatisfactory, as I want my Subscribers to be the lighter, the better, because there is a possibility that I will run subscriber in mainThread. In addition, according to the concept of responsive function programming, what Subscribers should do is to "respond" and respond to events emitted by Observable instead of modifying them. Wouldn't it be cool if I could transform "Hello World!" in some intermediate steps?

Example

We will use the OpenWeatherMap API as a demonstration example. OpenWeatherMap (/) is a free weather data API, which is very easy to configure and use. When calling, you only need to pass in location information (city name or geographical coordinates) as parameters.
Usually, to implement the following steps to call an API (each step has a bunch of formulaic code):

Create the required model class (add comments if necessary).
Implement request-response management network layer code with error handling.
Use background threads to implement request calls (usually implemented in the form of asynchronous tasks), and use a callback function (Callback Function) to present response information on the UI thread.
Create a model class

The first step we can rely on some JSON-POJO generation tool (semi) automation like jsonschema2pojo. The model classes of OpenWeather API are as follows:

public class WeatherData {
 
  public Coordinates coord;
  public Local sys;
  public List<Weather> weathers;
  public String base;
  public Main main;
  public Wind wind;
  public Rain rain;
  public Cloud clouds;
  public long id;
  public long dt;
  public String name;
  public int cod;
 
  public static class Coordinates {
    public double lat;
    public double lon;
  }
 
  public static class Local {
    public String country;
    public long sunrise;
    public long sunset;
  }
 
  public static class Weather {
    public int id;
    public String main;
    public String description;
    public String icon;
  }
 
  public static class Main {
    public double temp;
    public double pressure;
    public double humidity;
    public double temp_min;
    public double temp_max;
    public double sea_level;
    public double grnd_level;
  }
 
  public static class Wind {
    public double speed;
    public double deg;
  }
 
  public static class Rain {
    public int threehourforecast;
  }
 
  public static class Cloud {
    public int all;
  }
 
}

Use Retrofit to implement network calls

In the second step, we usually need to write a lot of formulaic code to implement the network call, but if we use Square's Retrofit component (/retrofit/) to implement it, it will greatly reduce the amount of code. Just create an interface class (using comments to describe the entire request) and then use it to create the client. Retrofit can also be used to complete the serialization and deserialization of JSON.

private interface ApiManagerService {
  @GET("/weather")
  WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}

In the above example, we can see that the comment before the method is composed of an HTTP method (we are using GET here, and of course you can also use Retrofit to implement POST, PUT, DELETE and HEAD methods as needed) and a relative path (the basic path is provided by). @Query annotation is used to assemble request parameters. We have two parameters, one is place (representing the position), and the other is units of measurement.

Let's look at a specific call example (in actual code, this call should be placed in a non-UI thread). This code is relatively easy to understand:

//...
final RestAdapter restAdapter = new ()
  .setServer("/data/2.5")
  .build();
 
final ApiManagerService apiManager = ();
final WeatherData weatherData = ("Budapest,hu", "metric");
//...

How about it, it's very simple. You only need a little code to implement the entire calling process. This is the power of Retrofit

Responsive programming with RxJava

Now we are going to step 3: RxJava part! Our example here will use it to implement asynchronous request calls.
First, we need to replace the interface class created earlier with this class:

public class ApiManager {
 
  private interface ApiManagerService {
    @GET("/weather")
    WeatherData getWeather(@Query("q") String place, @Query("units") String units);
  }
 
  private static final RestAdapter restAdapter = new ()
    .setServer("/data/2.5")
    .build();
  private static final ApiManagerService apiManager = ();
 
  public static Observable<WeatherData> getWeatherData(final String city) {
    return (new <WeatherData>() {
      @Override
      public Subscription onSubscribe(Observer<? super WeatherData> observer) {
        try {
          ((city, "metric"));
          ();
        } catch (Exception e) {
          (e);
        }
 
        return ();
      }
    }).subscribeOn(());
  }
 
}

Let's first look at the getWeatherData() method. It calls the() method and passes an implementation to the method to get an Observable object and return it. And it starts working once the Observable object is subscribed. The result of each processing of Observable will be passed as a parameter to the onNext() method. Because we just want to implement concurrent calls for network requests, we only need to make the request call once in each Observable object. The code finally calls the onComplete() method. The subscribeOn() method here is very important, it determines which thread the program will use. What is called here is (), this thread is used to optimize IO and network performance related work.

The last step is to implement this API call. The following code implements concurrent network requests, each request calls the same url asynchronously using different calling parameters:

(cities)
      .mapMany(new Func1<String, Observable<WeatherData>>() {
        @Override
        public Observable<WeatherData> call(String s) {
          return (s);
        }
      })
      .subscribeOn(())
      .observeOn(())
      .subscribe(new Action1<WeatherData>() {
        @Override
        public void call(WeatherData weatherData) {
          // do your work
        }
      });

The () method converts the city name array into an observable object, and provides the string in the array to different threads. Then the mapMany() method will convert each string provided by the former into an observable object (translation note: the new object contains weatherData object data). The conversion here is done by calling().

It is still registered here on the I/O thread pool. On Android system, if the results need to be displayed on the UI, the data must be published to the UI thread for processing. Because we know that on Android, only the most original thread that creates the interface can operate the interface. Here you just need to use the observeOn() method to call(). The call to the subscribe() method will trigger the observable object, where we can handle the results emitted by the observable object.

This example demonstrates the power of RxJava. If there is no Rx, we need to create N threads to call the request, and then hand over the processing results to the UI thread asynchronously. Use Rx to do the job with very little code, and use its powerful capabilities to create, merge, filter and convert observable objects.

RxJava can be used as a powerful tool for handling concurrency when developing Android Apps. Although it still takes some time to get familiar with it, sharpening the knife will not delay the chopping of wood. Once you master it, it will be of great help to you. Responsive extension library is a good idea, and we have used it for Android programs for several weeks (in the near future, the asynchronous task processing of our products will be done entirely based on it). The more you understand it, the more you will fall in love with it.