1. Delay the execution action
It can be implemented using the timer+map method. The code is as follows:
(5, ).map(value->{ return doSomething(); }).subscribe(::println); }
2. Delay the sending result
This scenario requires that the action of generating data be executed immediately, but the result is delayed in sending. This is different from the above scenario.
This scenario can be usedTo achieve it.
The zip operator combines the data transmitted by multiple Observables in order, each data can only be combined once, and they are all ordered. The number of final combined data is determined by the Observable, which transmits the least data.
For data in the same location of each Observable, you need to wait for each other. That is to say, after the data in the first location of the first observable is generated, you have to wait for the data in the first location of the second observable to be generated, and after the data in the same location of each Observable is generated, you can combine according to the specified rules. This is really what we want to use.
There are many kinds of declarations in zip, but it is roughly the same, which is to pass in several observables, then specify a rule to process the data at the corresponding location of each observable, and generate a new data. Here is one of the simplest:
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
The execution results of push and sending using zip are as follows:
((5,) ,(doSomething()), (x,y)->y) .subscribe(::println));
3. Use defer to perform some action in a specified thread
As in the following code, although we specify the running method of the thread,doSomething()
This function is still executed in the thread called by the current code.
(doSomething()) .subscribeOn(()) .observeOn(()) .subscribe(v->(()););
Usually we use the following methods to achieve our goal:
(s->{(doSomething());}) .subscribeOn(()) .observeOn(()) .subscribe(v->{ (()); });
But in fact, we can achieve the same goal by using defer.
About defer
The defer operator is the same as create, just, from and other operators. It creates class operators, but all data related to this operator takes effect only if you subscribe.
statement:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
The Observable in defer's Func0 is created only when subscribed.
effect:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
In other words, observable is created when subscribing.
The above problem is implemented with defer:
(()->(doSomething())) .subscribeOn(()) .observeOn(()) .subscribe(v->{(()); });
4. Do not break the chain structure using compose
We often see the following code:
(doSomething()) .subscribeOn(()) .observeOn(()) .subscribe(v->{(());
In the above code,subscribeOn(xxx).observeOn(xxx)
It may be the same in many places. If we plan to implement it in a certain place, we can write it like this:
private static <T> Observable<T> applySchedulers(Observable<T> observable) { return (()) .observeOn(()); }
But every time we need to call the above method, it will roughly be like the following, and the outermost is a function, which is equivalent to breaking the link structure:
applySchedulers((someSource).map(new Func1<Data, Data>() { @Override public Data call(Data data) { return manipulate(data); } }) ).subscribe(new Action1<Data>() { @Override public void call(Data data) { doSomething(data); } });
The compose operator can be used to achieve the purpose of not breaking the link structure.
The statement of compose is as follows:
public Observable compose(Transformer<? super T, ? extends R> transformer);
Its entry parameter is a Transformer interface, and the output is an Observable. And Transformer is actually aFunc1<Observable<T>
, Observable<R>>
, in other words: one type of Observable can be converted into another type of Observable.
Simply put, compose can convert the original observable into another Observable through the specified conversion method (input parameter transformer).
Through compose, specify the thread method in the following way:
private static <T> Transformer<T, T> applySchedulers() { return new Transformer<T, T>() { @Override public Observable<T> call(Observable<T> observable) { return (()) .observeOn(()); } }; } (doSomething()).compose(applySchedulers()) .subscribe(v->{(()); });
The function applySchedulers can be further simplified using lambda expressions to the following:
private static <T> Transformer<T, T> applySchedulers() { return observable->(()) .observeOn(()); }
5. Use different execution results according to priority
The above title probably did not express the scenario I wanted to express clearly. In fact, the scenario I want to express is similar to the usual scenario of obtaining network data: if there is a cache, it will be obtained from the cache, and if there is no, it will be obtained from the network.
It is required here that if there is a cache, the action of obtaining data from the network will not be performed.
This can be implemented using concat+first.
concat merges several Observables into one Observable and returns the final Observable. And those data are like being sent from an Observable. The parameters can be multiple Observables or iterator containing Observalbe.
The data arrangement in the new observable is arranged in the order of the original observable in the concat, that is, the data in the new result is sorted in the original order.
The following is the implementation of the above requirements:
(getDataFromCache(),getDataFromNetwork()).first() .subscribe(v->("result:"+v)); //Get data from cache private static Observable<String> getDataFromCache(){ return (s -> { //dosomething to get data int value = new Random().nextInt(); value = value%2; if (value!=0){ ("data from cache:"+value); //Create data } //(new Throwable("none")); (); } ); } //Get data from the network private static Observable<String> getDataFromNetwork(){ return (s -> { for (int i = 0; i < 10; i++) { ("obs2 generate "+i); ("data from network:" + i); //Create data } (); } ); }
In the above implementation, if getDataFromCache has data, the code here in getDataFromNetwork will not be executed, which is exactly what we want.
There are several implementations above that need attention:
1. It is possible that data cannot be obtained from both places. In this scenario, using first will throw an exception NoSuchElementException. If this scenario is like this, you need to replace the first above with firstOrDefault.
2. The abovegetDataFromCache()
In this case, if there is no data, we call onCompleted directly. If we do not call onCompleted but call onError, then the above-mentioned use of concat will not get any results. Because when concat receives any error, the merge will stop. Therefore, if we want to use onError, we need to use concatDelayError instead.The error will be ignored first and the error will be postponed until the end of processing.
Summarize
The above is the entire content of this article. I hope the content of this article will be of some help to your study or work. If you have any questions, you can leave a message to communicate.