SoFunction
Updated on 2025-03-08

You can learn about Rxjava in one article

Preface:

The first time I came into contact with RxJava was not long ago, when a new Android project was launched, I chose RxJava during evaluation. RxJava is a class library for asynchronous execution based on event subscriptions. It sounds a bit complicated, but in fact, you have to use it once and you will probably understand what it is going on! Why does an Android project start contact RxJava? Because it has been widely recognized in RxJava and is based on the Java language. Of course, developers who are good at organizing and summarizing will think of Android! That's right, RxAndroid is a library developed for Android based on RxJava. Today we will mainly explain RxJava. In the next few blogs, I will take you to understand the use of RxAndroid and Retrofit frameworks. These are some of the most popular technical frameworks at present!

Here is the project address of RxJava on Github:/ReactiveX/RxJava

Technical Documentation Api:/RxJava/javadoc/

Official introduction

1. Support Java 6+

2.3+

3. Asynchronous

4. Those who do not understand the design pattern based on the observer design pattern can move to this:A brief discussion on Java design pattern (15) Observer pattern (Observer)

(subscription)

Officially use RxJava

Use frameworks or libraries for simplicity and convenience, and RxJava is no exception. It can make your code logic more concise. Let's give an example before introducing the gradle code that depends on it:

compile ':rxjava:1.0.14'  
compile ':rxandroid:1.0.1'  

Since it is based on asynchronous, of course, it must be demonstrated in the time-consuming operation of processing! Now let's assume that there is such a requirement:

It is necessary to implement the function of multiple downloaded pictures and displaying them. Its function can add multiple download operations. Since the downloading process is time-consuming, it needs to be executed in the background, and the display of the pictures must be executed in the UI thread. There are many common implementation methods, and I posted one of them here:

new Thread() { 
  @Override 
  public void run() { 
    (); 
    for (File folder : folders) { 
      File[] files = (); 
      for (File file : files) { 
        if (().endsWith(".png")) { 
          final Bitmap bitmap = getBitmapFromFile(file); 
          getActivity().runOnUiThread(new Runnable() { 
            @Override 
            public void run() { 
              (bitmap); 
            } 
          }); 
        } 
      } 
    } 
  } 
}.start(); 

The judgment inside does not look a little dizzy? Of course, this is written by myself. I can see the logic inside clearly at a glance, but if someone else was reading your code, it would be more embarrassing!

Let's take a look at the code using RxJava:

(folders) 
  .flatMap(new Func1<File, Observable<File>>() { 
    @Override 
    public Observable<File> call(File file) { 
      return (()); 
    } 
  }) 
  .filter(new Func1<File, Boolean>() { 
    @Override 
    public Boolean call(File file) { 
      return ().endsWith(".png"); 
    } 
  }) 
  .map(new Func1<File, Bitmap>() { 
    @Override 
    public Bitmap call(File file) { 
      return getBitmapFromFile(file); 
    } 
  }) 
  .subscribeOn(()) 
  .observeOn(()) 
  .subscribe(new Action1<Bitmap>() { 
    @Override 
    public void call(Bitmap bitmap) { 
      (bitmap); 
    } 
  }); 

Is it clear? Although it is not simple, it is the same as before when you get used to it!

If you useAndroidStudioIn other words, when you open the Java file, you will see a preview that is automatically lambdaized, which will give you a clearer view of the program logic:

(folders) 
  .flatMap((Func1) (folder) -> { (()) }) 
  .filter((Func1) (file) -> { ().endsWith(".png") }) 
  .map((Func1) (file) -> { getBitmapFromFile(file) }) 
  .subscribeOn(()) 
  .observeOn(()) 
  .subscribe((Action1) (bitmap) -> { (bitmap) }); 

However, if you don't know much about Java8, this paragraph can be ignored for the time being, but you can move to here to learn about Java8:Introduction to the new features of Java 8

After reading the code, do you feel the urge to meet too late? Don't worry, let's learn about RxJava slowly!

As mentioned earlier, it is based on the Java observer design pattern. There are links to you on this pattern. You can go and have a look. Here is a lot of introductions. Let’s introduce the observer pattern in RxJava:

RxJava's Observer Mode

1. Explanation

1) RxJava has four basic concepts: Observable (observable, that is, the observer), Observer (observer), subscribe (subscribe), and events. Observable and Observer implement subscription relationships through the subscribe() method, so Observable can issue events to notify Observer when needed.

2) Unlike the traditional observer pattern, RxJava's event callback method also defines two special events: onCompleted() and onError() in addition to the ordinary event onNext() (equivalent to onClick() / onEvent()).

3) onCompleted(): The event queue is completed. RxJava not only handles each event individually, but also treats them as a queue. RxJava specifies that when no new onNext() is issued, the onCompleted() method needs to be triggered as a flag.

4) onError(): Event queue exception. When an exception occurs during event processing, onError() will be triggered, and the queue will automatically terminate, and no more events will be issued.

5) In a correctly running event sequence, onCompleted() and onError() have and only one, and are the last one in the event sequence. It should be noted that onCompleted() and onError() are also mutually exclusive, that is, if one of them is called in the queue, the other should not be called again.

2. Realization

1) Create Observer

Observer is an observer, which determines what behavior will occur when the event is triggered. Implementation of the Observer interface in RxJava:

Observer<String> observer = new Observer<String>() { 
  @Override 
  public void onNext(String s) { 
    (tag, "Item: " + s); 
  } 
  @Override 
  public void onCompleted() { 
    (tag, "Completed!"); 
  } 
  @Override 
  public void onError(Throwable e) { 
    (tag, "Error!"); 
  } 
}; 

In addition to the Observer interface, RxJava also has a built-in abstract class that implements Observer: Subscriber. Subscriber has some extensions to the Observer interface, but their basic usage is exactly the same:

Subscriber<String> subscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { 
    (tag, "Item: " + s); 
  } 
  @Override 
  public void onCompleted() { 
    (tag, "Completed!"); 
  } 
  @Override 
  public void onError(Throwable e) { 
    (tag, "Error!"); 
  } 
}; 

Not only is the basic usage method the same, in essence, during the subscribe process of RxJava, Observer will always be converted into a Subscriber before use. So if you just want to use basic functions, choosing Observer and Subscriber are exactly the same. There are two main differences for users:

onStart(): This is how Subscribe adds. It will be called before the subscribe starts and the event is sent, and can be used to do some preparation, such as clearing or resetting data. This is an optional method, and its implementation is empty by default. It should be noted that if there are requirements for the preparation thread (for example, a dialog box showing progress pops up, which must be executed in the main thread), onStart() does not apply because it is always called in the thread that occurs in the subscribe thread and cannot specify the thread. To prepare in the specified thread, you can use the doOnSubscribe() method, which can be seen in the following article.

unsubscribe(): This is another method of Subscription implemented by Subscriber, which is used to unsubscribe. After this method is called, Subscriber will no longer receive events. Generally, before this method is called, you can use isUnsubscribed() to determine the status first. The unsubscribe() method is very important because after subscribe(), Observable will hold a reference to Subscriber. If this reference cannot be released in time, there will be a risk of memory leakage. So it is best to maintain a principle: call unsubscribe() in appropriate places (such as onPause() onStop() and other methods) as soon as possible when it is no longer used to dereferences to avoid memory leakage.

2) Create Observable

Observable is the observer, which determines when and what event will be triggered. RxJava uses the create() method to create an Observable and defines the event triggering rule for it:

Observable observable = (new <String>() { 
  @Override 
  public void call(Subscriber<? super String> subscriber) { 
    ("Hello"); 
    ("Hi"); 
    ("Aloha"); 
    (); 
  } 
}); 

As you can see, an OnSubscribe object is passed as a parameter here. OnSubscribe will be stored in the returned Observable object. Its function is equivalent to a schedule. When Observable is subscribed, the call() method of OnSubscribe will be automatically called, and the event sequence will be triggered in sequence according to the settings (for the above code, the observer Subscriber will be called three times onNext() and once onCompleted()). In this way, by calling the observer's callback method, the event transmission from the observer to the observer is realized, that is,Observer mode

The create() method is RxJava's most basic method to create event sequences. Based on this method, RxJava also provides some methods to quickly create event queues, such as:

just(T...): Send the passed parameters in sequence.

Observable observable = ("Hello", "Hi", "Aloha"); 
// will be called in sequence:// onNext("Hello"); 
// onNext("Hi"); 
// onNext("Aloha"); 
// onCompleted(); 

from(T[]) / from(Iterable<? extends T>) : After splitting the passed array or Iterable into specific objects, sending it out in turn.

String[] words = {"Hello", "Hi", "Aloha"}; 
Observable observable = (words); 
// will be called in sequence:// onNext("Hello"); 
// onNext("Hi"); 
// onNext("Aloha"); 
// onCompleted(); 

The above examples of just(T...) and from(T[]) are both equivalent to the previous examples of create(OnSubscribe).

3) Subscribe (subscribe)

After creating Observable and Observer, use the subscribe() method to connect them and the entire chain can work. The code form is very simple:

(observer); 
// or:(subscriber); 

The internal implementation of (Subscriber) is like this (core code only):

// Note: This is not the source code of subscribe(), but the core code after eliminating the code related to performance, compatibility, and scalability in the source code.// If you need to see the source code, you can go to RxJava's GitHub repository to download it.public Subscription subscribe(Subscriber subscriber) { 
  (); 
  (subscriber); 
  return subscriber; 
} 

As you can see, subscriber() does 3 things:

1. Call (). This method has been introduced before and is an optional preparation method.

2. Call (Subscriber) in Observable. Here, the logic for event sending starts running. From this we can also see that in RxJava, Observable does not start sending events immediately when it is created, but when it is subscribed, that is, when the subscribe() method is executed.

3. Return the incoming Subscriber as a Subscription. This is for convenience unsubscribe().
In addition to subscribe(Observer) and subscribe(Subscriber) , subscribe() also supports incompletely defined callbacks, and RxJava will automatically create Subscriber based on the definition. The form is as follows:

Action1&lt;String&gt; onNextAction = new Action1&lt;String&gt;() { 
  // onNext() 
  @Override 
  public void call(String s) { 
    (tag, s); 
  } 
}; 
Action1&lt;Throwable&gt; onErrorAction = new Action1&lt;Throwable&gt;() { 
  // onError() 
  @Override 
  public void call(Throwable throwable) { 
    // Error handling 
  } 
}; 
Action0 onCompletedAction = new Action0() { 
  // onCompleted() 
  @Override 
  public void call() { 
    (tag, "completed"); 
  } 
}; 
 
// Automatically create a Subscriber and use onNextAction to define onNext()(onNextAction); 
// Automatically create Subscriber and use onNextAction and onErrorAction to define onNext() and onError()(onNextAction, onErrorAction); 
// Automatically create Subscriber and use onNextAction, onErrorAction and onCompletedAction to define onNext(), onError() and onCompleted()(onNextAction, onErrorAction, onCompletedAction); 

A brief explanation of Action1 and Action0 that appear in this code. Action0 is an interface of RxJava. It has only one method call(), which has no parameters and no return value. Since the onCompleted() method also has no parameters and no return value, Action0 can be treated as a wrapper object, packing the content of onCompleted() and passing itself into subscribe() as a parameter to implement incompletely defined callbacks. This can actually be regarded as passing the onCompleted() method into subscribe() as a parameter, which is equivalent to the "closure" in some other languages. Action1 is also an interface, and it also has only one method call(T param), which also has no return value, but has one parameter; similar to Action0, since onNext(T obj) and onError(Throwable error) are also single-parameter without return value, Action1 can package onNext(obj) and onError(error) and pass in subscribe() to implement incompletely defined callbacks. In fact, while Action0 and Action1 are the most widely used in the API, RxJava provides multiple interfaces in the form of ActionX (such as Action2, Action3) that can be used to wrap different methods without return values.

4) Scenario example

Here are two examples:

a. Print an array of strings

Print out all strings in the string array names in sequence:

String[] names = ...; 
(names) 
  .subscribe(new Action1<String>() { 
    @Override 
    public void call(String name) { 
      (tag, name); 
    } 
  }); 

b. Get the picture from id and display it

The image is obtained from the specified drawable file id drawableRes, and displayed in the ImageView, and print toast errors when an exception occurs:

int drawableRes = ...; 
ImageView imageView = ...; 
(new OnSubscribe<Drawable>() { 
  @Override 
  public void call(Subscriber<? super Drawable> subscriber) { 
    Drawable drawable = getTheme().getDrawable(drawableRes)); 
    (drawable); 
    (); 
  } 
}).subscribe(new Observer<Drawable>() { 
  @Override 
  public void onNext(Drawable drawable) { 
    (drawable); 
  } 
 
  @Override 
  public void onCompleted() { 
  } 
 
  @Override 
  public void onError(Throwable e) { 
    (activity, "Error!", Toast.LENGTH_SHORT).show(); 
  } 
}); 

As in the above two examples, create Observable and Subscriber, and then string them together with subscribe(), and the basic use of RxJava is completed in one go. Very simple.

Notice:In RxJava's default rules, the issuance and consumption of events are on the same thread. In other words, if you only use the above method, what you can achieve is a synchronous observer mode. The purpose of the observer mode itself is the asynchronous mechanism of "backend processing and foreground callback", so asynchronous is crucial for RxJava. To implement asynchronousness, another concept of RxJava needs to be used: Scheduler.

Thread control — Scheduler (I)

Preface:

Without specifying a thread, RxJava follows the principle of thread immutability, that is, the thread calls subscribe() and the thread produces the event; the thread produces the event, the thread consumes the event. If you need to switch threads, you need to use Scheduler.

1) Scheduler's API (I)

In RxJava, Scheduler - a scheduler, equivalent to a thread controller. RxJava specifies what thread each piece of code should run on. RxJava has several schedulers built in, and they are already suitable for most usage scenarios:

(): Running directly on the current thread is equivalent to not specifying a thread. This is the default Scheduler.

():Always enable the new thread and perform operations on the new thread.

(): Scheduler used in I/O operations (read and write files, read and write databases, network information interaction, etc.). The behavior pattern is similar to newThread(). The difference is that the internal implementation of io() is to use an unlimited thread pool, which can reuse idle threads. Therefore, in most cases io() is more efficient than newThread(). Don't put the calculation work in io(), which can avoid creating unnecessary threads.

(): The Scheduler used to calculate. This calculation refers to CPU-intensive calculations, that is, operations that will not be limited by operations such as I/O, such as graphics calculations. This Scheduler uses a fixed thread pool with a size of CPU cores. Do not put I/O operations in computing(), otherwise the waiting time of I/O operations will waste CPU.

In addition, Android has a dedicated () that specifies operations to run on the main Android thread.

With these Schedulers, you can use subscribeOn() and observeOn() methods to control the thread. * subscribeOn(): Specifies the thread that occurs when subscribe(), that is, the thread that is located when it is activated. Or a thread generated by events. * observeOn(): Specifies the thread on which Subscriber runs. Or a thread of event consumption.

Code to understand the above text description:

(1, 2, 3, 4) 
  .subscribeOn(()) // Specify subscribe() to occur in IO thread  .observeOn(()) // Specifies that the callback of Subscriber occurs in the main thread  .subscribe(new Action1&lt;Integer&gt;() { 
    @Override 
    public void call(Integer number) { 
      (tag, "number:" + number); 
    } 
  }); 

In the above code, due to the specification of subscribeOn(()) , the contents of the created event 1, 2, 3, and 4 will be issued in the IO thread; and due to the specification of observeOn(()) , the printing of the subscribe number will occur in the main thread. In fact, this way of using subscribeOn(()) and observeOn(()) before subscribe() is very common. It is suitable for most program strategies of "background threads fetch data, main thread displays".

As for the example mentioned above, the image is obtained and displayed by the picture id, if these two sentences are added:

int drawableRes = ...; 
ImageView imageView = ...; 
(new OnSubscribe&lt;Drawable&gt;() { 
  @Override 
  public void call(Subscriber&lt;? super Drawable&gt; subscriber) { 
    Drawable drawable = getTheme().getDrawable(drawableRes)); 
    (drawable); 
    (); 
  } 
}) 
.subscribeOn(()) // Specify subscribe() to occur in IO thread.observeOn(()) // Specifies that the callback of Subscriber occurs in the main thread.subscribe(new Observer&lt;Drawable&gt;() { 
  @Override 
  public void onNext(Drawable drawable) { 
    (drawable); 
  } 
  @Override 
  public void onCompleted() { 
  } 
  @Override 
  public void onError(Throwable e) { 
    (activity, "Error!", Toast.LENGTH_SHORT).show(); 
  } 
}); 

Then, loading the image will occur in the IO thread, while setting the image will be set in the main thread. This means that even if it takes dozens or even hundreds of milliseconds to load the image, it will not cause any interface lag.

2) Principle of Scheduler (I)

RxJava's Scheduler API is very convenient and magical (I added a sentence and switched threads. How did it do it? And isn't subscribe() a method called directly from the outermost layer? Can it be specified threads?). However, the principle of Scheduler needs to be discussed later, because its principle is based on the principle of the following section "Transformation".

Well, I didn't say anything in this section, just to make you feel at ease and let you know that I didn't forget to explain the principle, but put it in a more appropriate place.

Transform

RxJava provides support for transforming event sequences, which is one of its core functions and the biggest reason why most people say "RxJava is so easy to use." The so-called transformation means processing the objects or entire sequences in the event sequence and converting them into different events or event sequences. The concept is always vague and difficult to understand, let’s look at the API.

1) API

First, let’s look at an example of map():

("images/") // Input type String  .map(new Func1&lt;String, Bitmap&gt;() { 
    @Override 
    public Bitmap call(String filePath) { // Parameter type String      return getBitmapFromPath(filePath); // Return type Bitmap    } 
  }) 
  .subscribe(new Action1&lt;Bitmap&gt;() { 
    @Override 
    public void call(Bitmap bitmap) { // Parameter type Bitmap      showBitmap(bitmap); 
    } 
  }); 

Here a class called Func1 appears. It is very similar to Action1 and is also an interface to RxJava, which is used to wrap methods containing a parameter. The difference between Func1 and Action is that Func1 wraps a method with a return value. In addition, like ActionX, FuncX also has multiple methods for different parameters. The difference between FuncX and ActionX is that FuncX wraps methods with return values.

It can be seen that the map() method converts the String object in the parameter into a Bitmap object and returns it. After passing the map() method, the parameter type of the event is also converted from String to Bitmap. This kind of transformation directly and returns is the most common and easiest transformation. However, RxJava's transformation is much more than that. It can not only target event objects, but also the entire event queue, which makes RxJava very flexible. I'll list a few commonly used transformations:

map(): Direct transformation of event objects, the specific functions have been introduced above. It is the most commonly used transformation in RxJava.

flatMap(): This is a very useful but very difficult to understand transformation, so I decided to spend more time introducing it. First, let’s assume such a requirement: suppose there is a data structure "student", and now we need to print out a set of students' names. The implementation method is very simple:

Student[] students = ...; 
Subscriber<String> subscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String name) { 
    (tag, name); 
  } 
  ... 
}; 
(students) 
  .map(new Func1<Student, String>() { 
    @Override 
    public String call(Student student) { 
      return (); 
    } 
  }) 
  .subscribe(subscriber); 

Very simple. So let’s assume again: What if we want to print out the names of all the courses that each student needs to take? (The difference in requirements is that each student has only one name, but has multiple courses.) First, it can be implemented like this:

Student[] students = ...; 
Subscriber<Student> subscriber = new Subscriber<Student>() { 
  @Override 
  public void onNext(Student student) { 
    List<Course> courses = (); 
    for (int i = 0; i < (); i++) { 
      Course course = (i); 
      (tag, ()); 
    } 
  } 
  ... 
}; 
(students) 
  .subscribe(subscriber); 

Still very simple. So what if I don't want to use a for loop in Subscriber, but instead want to pass a single Course object directly into Subscriber (this is important for code reuse)? It is obviously not possible to use map() because map() is a one-to-one conversion, and my current requirement is a one-to-many conversion. So how can one student be converted into multiple Course?

At this time, you need to use flatMap():

Student[] students = ...; 
Subscriber<Course> subscriber = new Subscriber<Course>() { 
  @Override 
  public void onNext(Course course) { 
    (tag, ()); 
  } 
  ... 
}; 
(students) 
  .flatMap(new Func1<Student, Observable<Course>>() { 
    @Override 
    public Observable<Course> call(Student student) { 
      return (()); 
    } 
  }) 
  .subscribe(subscriber); 

From the above code, we can see that flatMap() and map() have one similarity: it also converts the passed parameters and returns another object. But it should be noted that unlike map(), the return of flatMap() is an Observable object, and this Observable object is not directly sent to the Subscriber callback method. The principle of flatMap() is as follows: 1. Create an Observable object using the passed event object; 2. It does not send this Observable, but activates it, so it starts to send events; 3. Each event sent by the created Observable is transferred to the same Observable, and this Observable is responsible for handing these events to the Subscriber callback method. These three steps break the event into two levels, and the initial object is "paved" through a set of newly created Observables and distributed through a unified path. And this "flat" is what flatMap() calls flat.

Extension: Because asynchronous code can be added to nested Observables, flatMap() is also commonly used for nested asynchronous operations, such as nested network requests. Sample code (Retrofit + RxJava):

() // Return Observable<String>, request token when subscribe, and send token after response  .flatMap(new Func1&lt;String, Observable&lt;Messages&gt;&gt;() { 
    @Override 
    public Observable&lt;Messages&gt; call(String token) { 
      // Return Observable<Messages>, request a message list when subscribed, and send a list of requested messages after response      return (); 
    } 
  }) 
  .subscribe(new Action1&lt;Messages&gt;() { 
    @Override 
    public void call(Messages messages) { 
      // Process the display message list      showMessages(messages); 
    } 
  }); 

Traditional nested requests need to be implemented using nested Callbacks. And through flatMap(), nested requests can be written in a chain, thus keeping the program logic clear.

throttleFirst(): discards new events within a certain time interval after each event is fired. Commonly used as de-jitter filtering, such as button click listener: (button) // RxBinding code, the following article has an explanation. .throttleFirst(500, ) // Set up the anti-shake room

The interval is 500ms.subscribe(subscriber); Mom is no longer afraid that my user will tremble his hands and click on the two repeated interfaces.
In addition, RxJava also provides many convenient methods to implement event sequence transformation, so I will not give examples here.

2) The principle of transformation: lift()

Although these transformations have different functions, they are essentially processing and resend events. And inside RxJava, they are transform methods based on the same basis: lift(Operator). First, let’s take a look at the internal implementation of lift() (core code only):

// Note: This is not the source code of lift(), but the core code after eliminating the code related to performance, compatibility, and scalability in the source code.
// If you need to see the source code, you can go to RxJava's GitHub repository to download it.

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { 
  return (new OnSubscribe<R>() { 
    @Override 
    public void call(Subscriber subscriber) { 
      Subscriber newSubscriber = (subscriber); 
      (); 
      (newSubscriber); 
    } 
  }); 
} 

This code is interesting: it generates a new Observable and returns, and the implementation in the callback method call() of the parameter OnSubscribe used to create a new Observable actually looks the same as mentioned earlier ()! However, they are different~ The key to the difference is that the objects referred to by onSubscribe in the second line (subscriber) are different (high-energy warning: the next few sentences may cause serious physical discomfort)-

The onSubscribe in this sentence in subscribe() refers to the onSubscribe object in Observable. There is no problem with this, but the situation after lift() becomes a bit more complicated.

When lift() is included:

() After creating an Observable and adding the previous original Observable, there are already two Observables;

2. Similarly, the new OnSubscribe in the new Observable plus the original OnSubscribe in the previous original Observable has two OnSubscribes;

3. When the user calls the subscribe() of Observable after lift(), the new Observable returned by lift() is used, so the (subscriber) it triggers is also the new OnSubscribe in the new Observable, that is, the OnSubscribe generated in lift();

4. The onSubscribe in the call() method of this new OnSubscribe refers to the original OnSubscribe in the original Observable. In this call() method, the new OnSubscribe uses (subscriber) to generate a new Subscriber (the Operator is here, which associates the new Subscriber with the original Subscriber through its own call() method, and inserts its own "transform" code to implement the transformation), and then uses this new Subscriber to subscribe to the original Observable.

This implements the lift() process, which is a bit like a proxy mechanism that implements the transformation of event sequences through event interception and processing.
To simplify the details, it can also be said: after the Observable executes the lift(Operator) method, a new Observable will be returned. This new Observable will be like a proxy, responsible for receiving events sent by the original Observable and sending them to the Subscriber after processing.

Let’s give a specific implementation of Operator. Here is an example of converting an Integer object in an event into a String, for reference only:

(new &lt;String, Integer&gt;() { 
  @Override 
  public Subscriber&lt;? super Integer&gt; call(final Subscriber&lt;? super String&gt; subscriber) { 
    // Convert Integer objects in event sequence to String objects    return new Subscriber&lt;Integer&gt;() { 
      @Override 
      public void onNext(Integer integer) { 
        ("" + integer); 
      } 
      @Override 
      public void onCompleted() { 
        (); 
      } 
      @Override 
      public void onError(Throwable e) { 
        (e); 
      } 
    }; 
  } 
}); 

3) compose: overall transformation of Observable

In addition to lift(), Observable also has a transformation method called compose(Transformer). The difference between it and lift() is that lift() is for event items and event sequences, while compose() is for the Observable itself. For example, suppose there are multiple Observables in the program, and they all need to apply a set of the same lift() transformations. You can write this:

observable1 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber1); 
observable2 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber2); 
observable3 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber3); 
observable4 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber1); 

You think this is too unsoftware engineering, so you changed it to this:

private Observable liftAll(Observable observable) { 
  return observable 
    .lift1() 
    .lift2() 
    .lift3() 
    .lift4(); 
} 
... 
liftAll(observable1).subscribe(subscriber1); 
liftAll(observable2).subscribe(subscriber2); 
liftAll(observable3).subscribe(subscriber3); 
liftAll(observable4).subscribe(subscriber4); 

Readability and maintainability have been improved. However, Observable is wrapped in a method, which seems to add a little bit of limitation to Observe's flexibility. what to do? At this time, you should use compose() to solve it:

public class LiftAllTransformer implements <Integer, String> { 
  @Override 
  public Observable<String> call(Observable<Integer> observable) { 
    return observable 
      .lift1() 
      .lift2() 
      .lift3() 
      .lift4(); 
  } 
} 
... 
Transformer liftAll = new LiftAllTransformer(); 
(liftAll).subscribe(subscriber1); 
(liftAll).subscribe(subscriber2); 
(liftAll).subscribe(subscriber3); 
(liftAll).subscribe(subscriber4); 

As above, using the compose() method, Observable can directly process itself using the call method of the incoming Transformer object, so it does not have to be wrapped in the method.

The principle of compose() is relatively simple, and there is no picture attached.

Thread control: Scheduler (II)

In addition to flexible transformations, another awesome thing about RxJava is the free control of threads.

1) Scheduler's API (II)

As mentioned earlier, you can use subscribeOn() combined with observeOn() to implement thread control, so that the generation and consumption of events occur on different threads. But after understanding the transformation methods such as map() flatMap(), some good things (actually, I was when I first came into contact with RxJava) asked: Can I switch threads several times?
The answer is: Yes. Because observeOn() specifies the thread of the Subscriber, and this Subscriber is not (strictly speaking, it should be "not necessarily", but it may be understood as "not" here). Subscribe() parameter is the Subscriber corresponding to the current Observable when observable is executed, that is, its direct subordinate Subscriber. In other words, observeOn() specifies the thread where the operation after it is located. Therefore, if there is a need to switch threads multiple times, just call observeOn() once at each location where you want to switch threads. On code:

(1, 2, 3, 4) // IO thread, specified by subscribeOn()  .subscribeOn(()) 
  .observeOn(()) 
  .map(mapOperator) // New thread, specified by observeOn()  .observeOn(()) 
  .map(mapOperator2) // IO thread, specified by observeOn()  .observeOn()  
  .subscribe(subscriber); // Android Main thread,Depend on observeOn() Specify 

As mentioned above, through multiple calls to observeOn(), the program realizes multiple thread switching.

However, unlike observeOn() , subscribeOn() can be placed anywhere, but it can only be called once.

Another good thing (actually I was the one I did back then) asked: What if I had to call subscribeOn() multiple times? What will be the effect?

Let’s put this question first, let’s start with the principle of RxJava thread control.

2) Principle of Scheduler (II)

In fact, the internal implementations of subscribeOn() and observeOn() also use lift(). Look at the picture specifically (arrows of different colors represent different threads):
As can be seen from the figure, subscribeOn() and observeOn() both do thread switching (the "schedule..." part in the figure). The difference is that the thread switching of subscribeOn() occurs in OnSubscribe, that is, when it notifies the previous level of OnSubscribe, the event has not started to be sent yet, so the thread control of subscribeOn() can affect the beginning of the event; while the thread switching of observeOn() occurs in its built-in Subscriber, that is, when it is about to send an event to the next level of Subscriber, so observeOn() controls the thread behind it.

3) Extension: doOnSubscribe()

However, although more than one subscribeOn() has no effect on the process of event processing, it can be utilized before the process.

When talking about Subscriber, I mentioned that the onStart() of Subscriber can be used as initialization before the process begins. However, since onStart() is called when subscribe() occurs, it cannot specify a thread, but can only execute the thread when subscribe() is called. This leads to the risk of thread-demanding if onStart() contains code that requires threads (for example, displaying a ProgressBar on the interface, which must be executed on the main thread), because sometimes you cannot predict what thread subscribe() will execute on.

And corresponding to (), there is a method (). It is also executed after the subscribe() call and before the event is sent, but the difference is that it can specify threads. By default, doOnSubscribe() executes the thread that occurs in subscribe(); and if subscribeOn() is followed by doOnSubscribe(), it executes the thread specified by subscribeOn() closest to it.

Sample code:

(onSubscribe) 
  .subscribeOn(()) 
  .doOnSubscribe(new Action0() { 
    @Override 
    public void call() { 
      (); // Need to be executed in the main thread    } 
  }) 
  .subscribeOn(()) // Specify the main thread  .observeOn(()) 
  .subscribe(subscriber); 

As mentioned above, after doOnSubscribe(), you can specify the thread to prepare.

Summarize

The above is all about Rxjava in this article, and I hope it will be helpful to everyone. Interested friends can continue to refer to this site:Parsing the object locked by Synchronized in Java programmingIntroduction to the method of safe exit of Java multithreaded programmingThe basics of Java programming: imitating user login code sharingWait, if you have any questions, you can leave a message at any time, and the editor will reply to everyone in time. Thank you friends for your support for this site!