SoFunction
Updated on 2025-03-01

Some "pits" encountered in using RxJava

Preface

The more people use RxJava, the more they think it is easy to use, so they unconsciously discovered that RxJava is everywhere in the code. However, RxJava is not a silver bullet either, and there are still many problems to be solved. Here, I will briefly summarize some of the "pits" I encountered, and the content may be relatively loose.

1. Consider the switch of the main thread

A common method of using RxJava is to do processing in other threads and then switch to the UI thread to update the page. Among them, thread switching uses observeOn(). Download the file in the background and display the download progress in the front desk can be done in this way. However, practice has found that there are pitfalls in this. If the file is larger and the granularity of the download package is relatively small, this will lead to a backlog of notifications and eventually lead to errors.

This kind of error is actually understandable. After all, MainLooper works based on Message, and too many Messages will inevitably lead to some problems. Of course, this is still a relatively natural idea, and in the end it still needs to go to the source code to find out. The principle of ObserveOn has been analyzed in the previous article about RxJava, so let’s briefly list the code here. The focus is the internal class of OperatorObserveOn - ObserveOnSubscriber. The important code snippets are as follows:

 /** Observe through individual queue per observer. */
 static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
  final Subscriber<? super T> child;
  final  recursiveScheduler;
  final NotificationLite<T> on;
  final boolean delayError;
  final Queue<Object> queue;
  /** The emission threshold that should trigger a replenishing request. */
  final int limit;

  // the status of the current stream
  volatile boolean finished;

  final AtomicLong requested = new AtomicLong();

  final AtomicLong counter = new AtomicLong();

  /**
   * The single exception if not null, should be written before setting finished (release) and read after
   * reading finished (acquire).
   */
  Throwable error;

  /** Remembers how many elements have been emitted before the requests run out. */
  long emitted;

  // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
  // not prevent anything downstream from consuming, which will happen if the Subscription is chained
  public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
    = child;
    = ();
    = delayError;
    = ();
   int calculatedSize = (bufferSize > 0) ? bufferSize : ;
   // this formula calculates the 75% of the bufferSize, rounded up to the next integer
    = calculatedSize - (calculatedSize >> 2);
   if (()) {
    queue = new SpscArrayQueue<Object>(calculatedSize);
   } else {
    queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
   }
   // signal that this is an async operator capable of receiving this many
   request(calculatedSize);
  }

  void init() {
   // don't want this code in the constructor because `this` can escape through the
   // setProducer call
   Subscriber<? super T> localChild = child;

   (new Producer() {

    @Override
    public void request(long n) {
     if (n > 0L) {
      (requested, n);
      schedule();
     }
    }

   });
   (recursiveScheduler);
   (this);
  }

  @Override
  public void onNext(final T t) {
   if (isUnsubscribed() || finished) {
    return;
   }
   if (!((t))) {
    onError(new MissingBackpressureException());
    return;
   }
   schedule();
  }

  @Override
  public void onCompleted() {
   if (isUnsubscribed() || finished) {
    return;
   }
   finished = true;
   schedule();
  }

  @Override
  public void onError(final Throwable e) {
   if (isUnsubscribed() || finished) {
    (e);
    return;
   }
   error = e;
   finished = true;
   schedule();
  }

  protected void schedule() {
   if (() == 0) {
    (this);
   }
  }
 }

The key point is that this queue member, which stores messages that need to be sent to the downlink thread. For the main thread, conformity is actually quite heavy. From the perspective of message producers and consumers, too many and too fast messages will cause message blockage. Even, it cannot reach the blocking situation, because the size of the queue will have an upper limit,onNext()In the method()Exceptions may occur depending on how queue is implemented. But it is impossible to be infinitely large in any case, so there is no guarantee that there will be no exceptions.

The solution to this problem is actually very simple, and it can reduce the frequency of message generation when the producer. You can also not perform thread switching when processing messages, but use judgments to perform thread switching when necessary, such as usingrunOnUIThread()

2. RxJava avoids memory leaks

RxJava's responsive mechanism is essentially implemented by callbacks, so memory leaks will also occur. If Subscription is not managed, memory leaks can be very serious. For Subscription, there are actually several methods that are widely used, such as RxLifecycle and the simple CompositeSubscription. As for how they are used, they are actually very simple, so I won't go into details here.

When it comes to memory leaks, just talk about something off topic, animations may also lead to memory leaks. The reason is still some callback functions that implement the View change function, but after being revoked, the callback function is not cancelled, and the View may hold Context information, resulting in memory leaks. I recently discovered that the open source library LoadToastView has always had memory leaks, and the reason is as mentioned above.

Summarize

The above is the entire content of this article. I hope the content of this article will be of some help to your study or work. If you have any questions, you can leave a message to communicate. Thank you for your support.