subscribeOn and observeOn are responsible for thread switching, and some operators also specify threads by default.
We do not analyze how to execute in a thread here. We only look at how to switch to a specified thread.
subscribeOn
() generates an ObservableSubscribeOn object inside the method.
Let’s take a look at the subscribeActual method of ObservableSubscribeOn.
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); //Calling the onSubscribe method of the downstream Observer (parent); //The subscribeActual method of upstream Observable is executed through SubscribeTask ((new SubscribeTask(parent))); }
(Runnable) is used to execute the task of SubscribeTask. SubscribeTask itself is the implementation class of Runnable. Take a look at its run method.
@Override public void run() { //The upstream method has been switched to a new thread (parent); }
First, we can draw a conclusion: subscribeOn switches the upstream Observable subscribe method to a new thread.
What effect will be if subscribeOn is called multiple times to switch threads?
From bottom to top, each call to subscribeOn will cause the upstream Observable subscribeActual to switch to the specified thread. Then the last call to switch to the execution thread of the upstream creation operator subscribeActual. What if the operator has a default execution thread?
Operator default thread
If it is a creation operator and is at the top, then the thread switching of subscribeOn will not work for it. The sky is high and the emperor is far away, and the county magistrate is worse than the current management. This is the reason.
What would it be like if it was another operator?
Take the operator timeout as an example: it corresponds to ObservableTimeoutTimed and TimeoutObserver
@Override public void onNext(T t) { (t); //Timeout timeout startTimeout(idx + 1); } void startTimeout(long nextIndex) { //Leave it to the operator's default thread to execute ((new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { (t); } @Override public void onComplete() { (); } } @Override public void onTimeout(long idx) { (new TimeoutException(timeoutMessage(timeout, unit))); }
// static final class TimeoutTask implements Runnable { @Override public void run() { (idx); } }
You can see that the default execution thread of the operator is only used to perform timeout timing tasks. If it timed out, the onError method will be executed on the default thread of the operator. The operator's default thread has an impact on the downstream observer. It is necessary to treat it in detail.
observeOn
observeOn correspondingObservableObserveOn
andObserveOnObserver
.
// @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { (observer); } else { w = (); (new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
// @Override public void onSubscribe(Disposable d) { if ((, d)) { if (d instanceof QueueDisposable) { if (m == ) { //Execute the onSubscribe method of downstream Observer (this); schedule(); return; } if (m == ) { //Execute the onSubscribe method of downstream Observer (this); return; } } //Execute the onSubscribe method of downstream Observer (this); } } @Override public void onNext(T t) { //Omitted schedule(); } @Override public void onError(Throwable t) { //Omitted schedule(); } void schedule() { if (getAndIncrement() == 0) { /* ObserveOnObserver is the Runnable implementation class. It is handed over to the thread pool for execution. */ (this); } } void drainNormal() { final Observer<? super T> a = downstream; for (;;) { for (;;) { T v; try { v = (); } catch (Throwable ex) { (ex); return; } //Execute the onNext method of downstream Observer (v); } } } void drainFused() { for (;;) { if (!delayError && d && ex != null) { //Execute the onError method of downstream Observer (error); return; } (null); if (d) { ex = error; if (ex != null) { //Execute the onError method of downstream Observer (ex); } else { //Execute the onComplete method of downstream Observer (); } return; } } } //Execute thread tasks @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
From the above, we can see that ObservableObserveOn does not switch the execution thread of the upstream Observable subscribe method in its subscribeActual method. However, ObserveOnObserver switches various methods of the downstream Observer to a new thread through the schedule() method in its onNext, onError and onComplete.
Conclusion: ObserveOn is responsible for switching the execution threads of various methods of downstream Observer
What effect will be if the downstream switches threads through observeOn multiple times?
Each switch will have an impact on its downstream until it encounters the next observeOn.
Observer(onSubscribe,onNext,onError,onComplete)
onNext, onError, onComplete are consistent with the thread switched by the closest upstream observeOn. onSubscribe is different.
When encountering thread switching, the method will be called first in the corresponding subscribeActual method of the Observable. It will be passed up step by step until the upstream, and the upstream is called in the subscribeActual method, which is executed in the main thread. Therefore, the onSubscribe method is executed in the main thread anyway.
doOnSubscribe
.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { } })
What we want to look at is the execution thread of the method accept.
Find the corresponding DisposableLambdaObserver through the source code.
@Override public void onSubscribe(Disposable d) { //The accept method is called here. (d); }
This depends on which thread upstream executes the (disposable) method.
The (disposable) method is called in the subscribeActual method of the creation operator and the subscribeActual method of the Observable corresponding to subscribeOn. Then the execution threads at these two places determine the execution thread of (d);
doFinally
Corresponding to ObservableDoFinally and DoFinallyObserver
// @Override public void onError(Throwable t) { runFinally(); } @Override public void onComplete() { runFinally(); } @Override public void dispose() { runFinally(); } void runFinally() { (); }
You can see that it is related to the execution threads of the onError, onComplete, and dispose methods of the corresponding DoFinallyObserver. The execution threads of these three methods are affected by the upstream observeOn. If there is no observeOn, they will be affected by the upstream method.
doOnError
Corresponding to ObservableDoOnEach and DoOnEachObserver
// @Override public void onError(Throwable t) { (t); }
Keep consistent with the corresponding threads.
doOnNext
Corresponding to ObservableDoOnEach and DoOnEachObserver
// @Override public void onNext(T t) { (t); }
Keep consistent with the corresponding threads.
Execution thread for operator corresponding method parameters
The interface classes under the package are generally used to process upstream data and then pass them down. The methods of these interface classes are generally called in the corresponding ones. Therefore, their threads are consistent.
Summarize:
The execution thread that subscribeOn switches from bottom to top is not affected by observeOn, nor is it affected by non-creation operators with default specified threads. However, it will be seized by the upstream subscribeOn subscribeOn to the top. If the upstream creation operator also has a default execution thread, then any subscribeOn thread switching does not work. After subscribeOn reaches the top to the top, then the downstream observer's execution thread is affected from the upstream to the bottom. ObserveOn will be seized by the thread switching rights. ObserveOn affects the execution thread of the downstream observer. From top to bottom, another observeOn will hand over the thread control power. When an operator with a specified default thread is not created, it will be treated according to the specific situation.
The above is all the content of this article. I hope it will be helpful to everyone's study and I hope everyone will support me more.