2015. május 11., hétfő

Operator concurrency primitives: serialized access (part 1)

Introduction

The most important requirement of the RxJava library is that its Observer/Subscriber methods of onNext, onError and onCompleted are called in a sequential way. Many times, we call this serialized access but it has nothing to do with Java's own Serializable interface or data serialization in general.

Such serialization comes into play when the operator is involved in some concurrent behavior. A typical example if one merges multiple streams of data in some way by using merge, zip or combineLatest operators for example.

But there could be other places where such serialization is required that is not so obvious at first glance. Such example is the takeUntil operator. In this operator, we use the emission/completion of the provided another Observable to terminate the main observable sequence by calling onCompleted() on the downstream sequence. Now since both the main-sequence and the until-sequence can run asynchronously, both can emit any kind of onXXX events at any time in respect to each other, but still the downstream has to receive the events in a sequential way.

The classical way of approaching such sequence serialization problem is by using synchronized on the Observer methods. This might seem to get the job done, but is prone to deadlocks if any of the downstream operators do some blocking while interacting with the upstream in some way. Such deadlocks might happen with other RxJava infrastructure elements.

Therefore, in RxJava we forbid calling any of the following methods while holding locks:

  • Observer: onNext, onError, onCompleted
  • Subscriber: onStart, setProducer, request
  • Subscription: unsubscribe
  • Scheduler.Worker: schedule, schedulePeriodically
  • Producer: request
(Unfortunately, we can't enforce this with automatically so pull requests are manually checked for violations.)

Since locks are forbidden, we need to use some other serialization construct.

There are mainly two kinds of serialization constructs employed in RxJava. I call them emitter-loop and queue-drain. (I did not invent them but most Java concurrency books I read don't seem to mention these kinds of constructs).

Note: I'll be using Java 8 syntax in the remainder of the post to save on some boilerplate code. However, RxJava needs to be polyglot and is targeted at Java 6 source levels, therefore, make sure if you post a pull request to the core library, the syntax and standard method calls use Java 6 library features only.

Emitter-loop

I call this construct the emitter-loop because it uses a boolean flag to indicate there is a thread that is doing emission work on behalf of others and there is a loop that keeps emitting until there isn't any work left.

The construct uses a boolean instance field emitting which is accessed in a synchronized block.

class EmitterLoopSerializer {
    boolean emitting;
    boolean missed;
    public void emit() {
        synchronized (this) {           // (1)
            if (emitting) {
                missed = true;          // (2)
                return;
            }
            emitting = true;            // (3)
        }
        for (;;) {
            // do the emission work     // (4) 
            synchronized (this) {       // (5)
                if (!missed) {          // (6)
                    emitting = false;
                    return;
                }
                missed = false;         // (7)
            }
        }
    }
}

Let's see how the emit() method in the example works.

  1. When we enter the synchronized block, we can be in one of two states: nobody is currently emitting and there is somebody emitting. Since we are in the block, if there is any ongoing emission in (4), that thread can't enter the synchronized block at (5) until (1) has finished.
  2. In case there is an emission going on, we need to indicate there is more work to do. The example shows a simple way of doing this by setting another boolean flag. Depending on what kind of emission should happen, the more-work indicator can have different data type (for example, java.util.List in some of RxJava's operators). More on these later.
  3. In case there was no emission going on, only a single thread will win the right to emit which is indicated by setting the emitting flag to true.
  4. When one of the threads won the right to emit, we enter the loop and perform as much emission work as possible. This highly depends on what the emitter-loop should do but must be implemented with care because if implemented incorrectly, it can lead to signal loss and hangs (inactivity).
  5. Once the loop thinks all work has been done, it tries to enter the synchronized block. Since other threads may call emit just before entering the synchronized block, more work may appear that needs to be handled. Since only a single thread can enter the block and we are using the missed flag, the emitter-loop entering the block will either see there are no more work and quit emitting or loop again to process the additional work.
  6. If no other thread called emit() while it reached the synchronized block at (5), we quit emitting (setting the flag to false). Note that since the block allows one thread at a time, a subsequent entry at (1) will see the flag as false and start emitting on its own.
  7. If there were missed work, we reset the missed flag and loop again. Resetting the flag is essential to avoid infinite emission loops.
Now in real-life operators, there are several typical ways to 'queue' up work for the emitter-loop.

One such way is to use some thread-safe data structure up-front of the emitter-loop, append the work to it then within the emitter-loop and remove any work until the data structure becomes empty.

To demonstrate this, here is a simple example that serializes the emission of some value of T.

class ValueEmitterLoop<T> {
    Queue<T> queue = new MpscLinkedQueue<>();    // (1)
    boolean emitting;
    Consumer<? super T> consumer;                // (2)

    public void emit(T value) {
        Objects.requireNonNull(value);
        queue.offer(value);                      // (3)
        synchronized (this) {
            if (emitting) {
                return;                          // (4)
            }
            emitting = true;
        }
        for (;;) {
            T v = queue.poll();                  // (5)
            if (v != null) {
                consumer.accept(v);              // (6)
            } else {
                synchronized (this) {
                    if (queue.isEmpty()) {       // (7)
                        emitting = false;
                        return;
                    }
                }
            }
        }
    }
}

In the ValueEmitterLoop example, we have a slightly different emission logic going on:

  1. We use a thread-safe queue to hold onto values that need to be emitted. Since there can be many threads calling emit but only one polling the queue, using Java's ConcurrentLinkedQueue would add an unnecessary overhead. Instead, I suggest using JCTools' optimized queue implementations. The example uses an unbounded queue variant but if the maximum number of elements to be queued is known (because the emitter-loop participates in a bounded backpressure scenario), one can use MpscArrayQueue instead.
  2. The example simply uses a Consumer callback to emit the values to.
  3. First, we enqueue the non-null value no matter what (JCTools doesn't support null values).
  4. When we enter the synchronized block, but find another thread emitting, we just quit. There is no need to set a missing flag here because the emptyness of the queue is enough indication.
  5. In the loop, we poll elements out of the queue.
  6. Since we don't allow null elements, the null returned by the queue indicates the queue is empty.
  7. Once we enter the synchronized block, we check if the queue is still empty and if so, the loop quits and we set the emitting flag to zero. Now since we offer to the queue outside any synchronized block, it is possible more values are enqueued at (3) while we get from (5) being null to (7) and beyond. There could be two interleavings at play here: a) a new value is enqueued before (7) executes and thus we will loop again; b) a new value is enqueued after (7) in which case the emitter-loop quits and lets the other thread enter the emitter-loop and continue polling the queue. In conclusion, there is no signal lost.
The second way of 'queueing' work is inside the synchronized blocks so non-thread-safe data structures can be employed.

An example of this is what happens in RxJava's SerializedObserver class. I'll show this by changing the previous example:

class ValueListEmitterLoop<T> {
    List<T> queue;                           // (1)
    boolean emitting;
    Consumer<? super T> consumer;

    public void emit(T value) {
        synchronized (this) {
            if (emitting) {
                List<T> q = queue;
                if (q == null) {
                    q = new ArrayList<>();   // (2)
                    queue = q;
                }
                q.add(value);
                return;
            }
            emitting = true;
        }
        consumer.accept(value);              // (3)
        for (;;) {
             List<T> q;
             synchronized (this) {           // (4)
                 q = queue;
                 if (q == null) {            // (5)
                     emitting = false;
                     return;
                 }
                 queue = null;               // (6)
             }
             q.forEach(consumer);            // (7)
        }        
    }
}

The ValueListEmitterLoop has more complicated logic in its synchronized blocks, but they are fairly straightforward behaviors:

  1. We are using a classic java.util.List to 'queue' up work. In addition, we will use it as an indicator of missed work if it is non-null. At the beginning, there is no missed work and thus the field is not initialized to reference any list instance.
  2. If we find some other thread emitting, we append to the queue (and initialize it to some ArrayList if no other missed work happened that far).
  3. The thread who won the emission right now can emit the value directly to the consumer. There is no need to run it through the queue (saving time) and there is no need to check the queue for elements at this point because it can be only null at that point (since the loop is exhaustive and exits only if queue is null).
  4. At this point, we may have queued up values so we enter a synchronized block and see if queue is not-null.
  5. If the queue is null, we quit emitting. Since the queue is created/added in another synchronized block, there is no race and no possibility of lost emission opportunity.
  6. We set the queue to null which indicates no more values are available (at this point). This will make sure if the loop starts over and no emit() is called the meantime, the loop will quit emitting after the check at (5).
  7. We simply for-each over the values in q, which is thread safe now because it has been 'disconnected' from the class and no other thread can see it.
Unfortunately, calls such as consumer.accept() may throw an unchecked exception, leaving our emitter-loop in emitting state. This comes up frequently when the emitter-loop is used for emitting error events besides regular values (see Notification and NotificationLite). In such cases, once the exception is thrown, there will be another call to emit() which needs to get through.

To avoid the situation, one can wrap each call into a try-catch, but usually the caller of emit() needs to be informed about the problem. However, we could simply let the exception propagate out naturally, but on the way out, we set the emitting flag to false again.


    // same as above
    // ...
    public void emit(T value) {
        synchronized (this) {
            // same as above
            // ...
        }
        boolean skipFinal = false;             // (1)
        try {
            consumer.accept(value);            // (5)
            for (;;) {
                List<T> q;
                synchronized (this) {           
                    q = queue;
                    if (q == null) {            
                        emitting = false;
                        skipFinal = true;      // (2)
                        return;
                    }
                    queue = null;
                }
                q.forEach(consumer);           // (6)
            }
        } finally {
            if (!skipFinal) {                  // (3)
                synchronized (this) {
                    emitting = false;          // (4)
                }
            }
        }
    }

Since Java 6 doesn't allow rethrowing a Throwable expection (the default catch type in RxJava) from a catch block, we need to use a finally block to hijack the exception throwing process.


  1. We declare a boolean skipFinal variable. If set to true, it indicates a normal completion and we will skip the logic in the finally block.
  2. On the emptied queue path, we set skipFinal to true so (3) becomes false, skipping the synchronized block altogether. We need this because an unconditional set to emitting would accidentally unlock a running emitter loop for someone else because the time window between (2) and (3).
  3. If either (5) or (6) of the accept calls throws an exception, skipFinal will be still false which will allows the program flow to enter the synchronized block. Once the finally block completes, the exception thrown continues its way up in the call chain.
  4. By setting emitting back to false, we allow subsequent calls to emit() to succeed.
You may have heard that using synchronized in a concurrent code can hurt performance then why are we using it in RxJava so much? 

Generally, the first rule of thumb is to measure the performance of your code before assuming anything about its performance. We did this in RxJava and came to a surprising discovery: synchronized gives better throughput - in some benchmarks I might add.

Since RxJava is not opinionated about threading/scheduling, it should work in synchronous and asynchronous scenarios. Many of the applications using RxJava run their streams in mostly synchronous fashion.

Now Java likes synchronous and single-threaded operations and to help such code, it features optimizations such as biased locking and lock elision. Essentially, these two can get rid of the synchronized blocks in the examples above, allow more optimizations and thus make a sequential benchmark perform better (sometimes reducing the overhead by a magnitude). 

In contrast, the alternatives using lock-free atomics (such as the queue-drain I'll blog about in the next part) is an unavoidable overhead, and if complex state is involved, it can increase the memory allocation rate.

Of course, when if Java detects concurrency on these locks, the biased locking is revoked and it runs as a regular lock from then on. (Note that this revocation is stop-the-world event and can induce significant latency.)

Such probabilistic performance gain is hard to walk by, therefore, RxJava decided to rely on it as long as possible.

Conclusion

One of the most important algorithms one needs to understand when writing RxJava operators are those that serialize event emission or work to be run by a single thread at a time. In this blog post, I've introduced the so-called emitter-loop approach which gives a decent synchronous performance over the alternatives.

Due to its blocking nature and the cost of bias-revocation, I advise to employ this technique in operators that are either run synchronously or asynchronously in less than 50% of the time.

However, if the asynchrony-rate is above 50% or one can be sure that there will be different threads involved all the time (such as in observeOn) there is a potentially better alternative I call queue-drain, of which I will blog about in the next part.

10 megjegyzés:

  1. Thanks for the mention of JCTools, and for your feedback and involvement :-)

    VálaszTörlés
    Válaszok
    1. I thank you for JCTools; learned a lot from the sources and your blog posts!

      Törlés
  2. I love this blog...your articles have been extremely helpful for shortening the RxJava learning curve and cementing the API definitions into something more tangible. Thank you so much!

    VálaszTörlés
  3. In the sentence "If there were missed work, we reset the missing flag and loop again. Resetting the flag is essential to avoid infinite emission loops", we reset the missed flag rather than missing flag, right?

    VálaszTörlés
  4. Dear David, I have started the process of translating this Advanced RxJava blog series to Chinese, it's hosted on http://blog.piasy.com/AdvancedRxJava/ , and the source is hosted on Github: https://github.com/Piasy/AdvancedRxJava.

    This article is translated, link: http://blog.piasy.com/AdvancedRxJava/2016/05/06/operator-concurrency-primitives/.

    VálaszTörlés
  5. Hi David! Amazing article. One thing: could you elaborate on the deadlock conditions you shortly referenced in the intro? An example would really help. Thanks!

    VálaszTörlés
  6. Thank you so much. Hope you're keep writing and share knowledge

    VálaszTörlés
  7. Hi David.

    In the `ValueListEmitterLoop` class in the first `synchronized` block you have this code:

    ```
    if (emitting) {
    List q = queue;
    if (q == null) {
    q = new ArrayList<>(); // (2)
    queue = q;
    }
    q.add(value);
    return;
    }
    emitting = true;
    ```

    Why is there an extra variable `List q`. What's the different between the existing code and this one:

    ```
    if (emitting)
    if (queue == null) {
    queue = new ArrayList<>(); // (2)
    }
    queue.add(value);
    return;
    }
    emitting = true;
    ```
    Thanks.

    VálaszTörlés
    Válaszok
    1. To emphasize that avoiding reading a field multiple times is preferable (by using a local variable). Here, there is no functional difference. As time passes, coding style gets refined but old code is often not updated for the sake of prettiness only.

      Törlés