2017. szeptember 11., hétfő

Interoperation between RxJava and Kotlin Coroutines

Introduction


Writing imperative-looking code with Kotlin Coroutines is certainly an attractive property of it, but I'd think things can get quite convoluted pretty fast once, for example, Selectors are involved.

I haven't gotten there to look at what Selectors are, I only read that they can help you implement a flatMap like stream combiner. We are not goind to do that now, RxJava can do it for us after all.

However, the reasonable question arises: if I have a coroutine generator, a coroutine transformation or simply want to receive items from a Flowable, how can I make RxJava work with these coroutines?

Easily with the combined magic of Kotlin Coroutines and RxJava coroutines!


Suspendable Emitter


A generator is a source-like construct that emits items followed by a terminal signal. It should be familiar from RxJava as the Flowable.generate() operator. It gives you a FlowableEmitter and the usual onNext, onError and onComplete calls on it.

One limitation is that you can call onNext only once per invocation of your (Bi)Consumer lambda that receives the emitter. The reason is that we can't block a second call to onNext and we don't want to buffer it either; therefore, RxJava cooperates with the developer.

Compiler supported suspension and state machine built by it, however, allow us to prevent a second call from getting through by suspending it until there is a demand from the downstream, which then resumes the coroutine where it left off. Therefore, we can lift the single onNext requirement for our Coroutine-based generator.

So let's define the SuspendEmitter interface


interface SuspendEmitter<in T> : CoroutineScope {

    suspend fun onNext(t: T)

    suspend fun onError(t: Throwable)

    suspend fun onComplete()
}


By extending the CoroutineScope, we provide useful infrastructure (i.e., coroutineContext, isActive) to the block that will target our SuspendEmitter. One can argue that why use onError and onComplete since a coroutine can throw and simply end. The reason is that this way, a coroutine can terminate the sequence from a transformation we'll see later, just like our recent mapFilter operator allows it.


The flow-producer

Given our context providing interface for a generator coroutine, let's define the generator method the user will call:


fun <T> produceFlow(generator: suspend SuspendEmitter.() -> Unit) : Flowable<T> {
    return Produce(generator)
}


(For those unfamiliar with the Kotlin syntax, the SuspendEmitter.() -> Unit is practically a one parameter lambda of signature (param: SuspendEmitter) -> Unit where, when the lambda is implemented, accessing methods of param do not need to be qualified by it, thus you can write onNext(1) instead of param.onNext(1).)

We have to implement a Flowable that interacts with a suspendable generator function in some fashion. When implementing source-like operators, one usually has to write a Subscription instance and call Subscriber.onSubscribe() with it.

class Produce<T>(private val generator: suspend SuspendEmitter<T>.() -> Unit) : 
        Flowable<T>() {
    override fun subscribeActual(s: Subscriber<in T>) {
        launch(Unconfined) {
            val parent = ProduceSubscription(s)
            parent.setJob(coroutineContext[Job])
            s.onSubscribe(parent)
            generator(parent)
        }
    }
}


Since the generator is a suspendable coroutine, we need a context where it can run. The Unconfined context gives us a trampolined execution environment where resumptions of suspended coroutines are not confined to any particular thread, as if you'd run with the trampoline() Scheduler in RxJava.

We create our Subscription, attach the Job of the coroutine context itself to bridge the cancellation from a downstream Subscription.cancel(), signal the custom Subscription to the downstream and then execute the provided producer block by supplying it the parent which also implements SuspendEmitter.

So far, nothing is too hairy or convoluted, however, the interaction between regular trampolined coroutines of RxJava and the Kotlin Coroutine infrastructure is more involved.

Non-blocking await/notify

We will need a way to get the generator coroutine suspended if there are no downstream requests and we have to resume that coroutine when the downstream does request an amount. This resembles the wait-notify pair of a typical BlockingQueue implementation where a blocking emission due to a full queue gets unblocked by a notification by a concurrent take()/poll() invocation. Since we don't want to block and the coroutine infrastructure supports programmatic resuming of a coroutine, we'll use this feature in two helper methods establishing a non-blocking wait-notify exchange:


typealias Cont = Continuation<Unit>

fun notify(ref: AtomicReference<Cont?>) {
    while (true) {
        val cont = ref.get()
        val next : Cont?
        if (cont != null && cont != TOKEN) {
            if (ref.compareAndSet(cont, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(cont, TOKEN)) {
                break;
            }
        }
    }
}


We will use a valueless Continuation<Unit>, Cont for short, and atomics to place an indicator or an actual continuation object in an AtomicReference. The notify() atomically performs the following logic: if there is a real continuation in the reference, we clear it and then call resume on it to trigger the resumption. Otherwise, we set it to the shared TOKEN object indicating that when the other side, await, wanted to get continued, it can do so immediately.

fun await(ref: AtomicReference<Cont?>, cont: Cont) {
    while (true) {
        val a = ref.get()
        if (a == TOKEN) {
            if (ref.compareAndSet(a, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(a, cont)) {
                break;
            }
        }

    }
}


The await() method uses the same reference and the continuation instance provided by a suspendCoroutine in its code block.The method atomically checks if there is a TOKEN and if so, it calls resume on the continuation parameter after clearing the TOKEN from the reference. Otherwise, it stores the continuation in the reference and quits.

val TOKEN: Cont = object: Cont {
    override val context: CoroutineContext
        get() = throw UnsupportedOperationException()

    override fun resume(value: Unit) {
        throw UnsupportedOperationException()
    }

    override fun resumeWithException(exception: Throwable) {
        throw UnsupportedOperationException()
    }

}


Finally, the TOKEN is just an empty implementation of a Continuation - we should never call its methods as the object reference itself serves only a purpose of indicator for an immediate resumption.



The ProduceSubscription  

Now we can implement the ProduceSubscription class. First, let's see the skeleton with the relevant fields:

open class ProduceSubscription<T>(
        private val actual: Subscriber<in T>,
        private val ctx : CoroutineContext
) : Subscription, SuspendEmitter<T> {

    companion object {
        val CANCELLED = Object()
    }

    @Suppress("DEPRECATION")
    override val context: CoroutineContext
        get() = ctx!!

    override val isActive: Boolean
        get() = job.get() != CANCELLED

    private val job = AtomicReference<Any>()

    private val requested = AtomicLong()

    private val resume = AtomicReference<Cont?>()

    private var done: Boolean = false

    override suspend fun onNext(t: T) {
        // TODO implement
    }

    override suspend fun onError(t: Throwable) {
        // TODO implement
    }

    override suspend fun onComplete() {
        // TODO implement
    }

    override fun cancel() {
        // TODO implement
    }

    override fun request(n: Long) {
        // TODO implement
    }

    fun setJob(j: Job?) {
        // TODO implement
    }
}

We see the methods of both Subscription and SuspendEmitter along with a couple of fields/properties:


  • It takes the downstream's Subscriber and the CoroutineContext it will provide to the produce callback in the operator.
  • We will use the companion object's CANCELLED value to indicate the the parent job we get from the coroutineContext is cancelled exactly once.
  • It considers being active when the job object is not the CANCELLED indicator
  • Of which Job is then stored in the job AtomicReference.
  • We have to track the requested amount from downstream via an AtomicLong.
  • The resume AtomicReference stores the continuation to be used with the non-blocking await-notify shown in the previous section.
  • Finally, we have the done flag indicating the generator coroutine called onError or onComplete at most once.
Perhaps the main difficulty lies in the implementation of the onNext method as it is the primary interaction point between a coroutine that has to be suspended if there are no requests:


    override suspend fun onNext(t: T) {
        if (job.get() == CANCELLED) {
            suspendCoroutine { }
        }
        val r = requested.get()
        if (r == 0L) {
            suspendCoroutine { cont -> await(resume, cont)  }
        }

        actual.onNext(t)

        if (job.get() == CANCELLED) {
            suspendCoroutine { }
        }
        if (resume.get() == TOKEN) {
            resume.compareAndSet(TOKEN, null)
        }
        if (r != Long.MAX_VALUE) {
            requested.decrementAndGet()
        }
    }


First we check if the downstream has cancelled the generator in which case we should get out of the coroutine entirely. I'm not sure if there is a more appropriate way for doing this other than suspending indefinely.

Next, we check the request amount and if it is zero, we suspend the current coroutine by using our non-blocking await mechanism. Once notified, or there was at least one requested item, the code should continue with the emission of the item. This could trigger an in-sequence cancellation and we suspend the coroutine indefinitely again.

Since the downstream can immediately request some amount due to the s.onSubscribe(parent) call in the operator, before the generator can even run and call onNext, we may have a TOKEN in the resume field, that would otherwise incorrectly indicate the next call to await it can resume immediately, violating the backpressure we expect. I know this sounds convoluted, but I learned it the hard way...

Finally, we decrement the request amount if not unbounded.

The onError and onComplete look pretty much alike:


    override suspend fun onError(t: Throwable) {
        if (!done) {
            done = true
            actual.onError(t)
            cancel()
        }
        suspendCoroutine { }
    }

    override suspend fun onComplete() {
        if (!done) {
            done = true
            actual.onComplete()
            cancel()
        }
        suspendCoroutine { }
    }


We set the done flag to true, emit the relevant event to the downstream and then cancel the job/Subscription we are running with. I defensively suspend the coroutine afterwards.

Next we see how cancel() and setJob() works:

    override fun cancel() {
        val o = job.getAndSet(CANCELLED)
        if (o != CANCELLED) {
            (o as Job).cancel()
        }
    }

    fun setJob(j: Job?) {
        while (true) {
            val o = job.get()
            if (o == CANCELLED) {
                j?.cancel()
                break
            }
            if (job.compareAndSet(o, j)) {
                break
            }
        }
    }


They are pretty much implemented along RxJava's typical deferred cancellation mechanism. cancel() atomically swaps in the CANCELLED indicator and calls cancel on the Job it contained. setJob() atomically set the Job instance or cancels it if cancel() swapped in the CANCELLED indicator just before that.

Lastly, the request() implementation that is responsible for accounting downstream requests and resuming the suspended generator if inside onNext().

    override fun request(n: Long) {
        if (BackpressureHelper.add(requested, n) == 0L) {
            notify(resume)
        }
    }


In the RxJava world, a transition from 0 to n triggers the emission loop in a range() operator for example. Here, we notify a possibly suspended coroutine that will resume from the await() method we implemented.

Testing it is simple with RxJava:


val f = produceFlow {
    for (i in 0 until 10) {
         println("Generating $i")
         onNext(i)
    }
    onComplete()
}

f.test(0)
.assertEmpty()
.requestMore(5)
.assertValues(0, 1, 2, 3, 4)
.requestMore(5)
.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


Outstanding!

The flow-transformer

Now that we have a way to emit items, we would like to emit an item in response to an upstream value, like the map() operator but with a suspendable coroutine function. RxJava's map is confined to return one item in exchange for one upstream item.

With coroutines and the ProduceSubscription described in the previous section, we could emit any number of items without overflowing a Subscriber!

Let's define our API and a skeleton implementation for it first:


fun <T, R> Flowable<T>.transform(
        transformer: suspend SuspendEmitter.(T) -> Unit)
 : Flowable<R> {
    return Transform(this, transformer)
}

class Transform<T, R>(
        private val source: Flowable<T>, 
        private val transformer: suspend SuspendEmitter<R>.(T) -> Unit)
 : Flowable() {
    override fun subscribeActual(s: Subscriber) {
        // TODO implement
    }
}


We define a transform extension method on Flowable with a suspendable transformer that takes our SuspendEmitter, the upstream's value and returns nothing.

This time, we have an upstream we have to subscribe to via a regular FlowableSubscriber from RxJava, call the coroutine in some way and make sure we keep calling the upstream for more values as we have to deal with the backpressure of the coroutine itself transitively.

The first step into this direction is the handling of the upstream's own Subscription we get through Subscriber.onSubscribe. We have to attach that to the Subscription we show to the downstream Subscriber. Since we will use the ProduceSubscription anyway, we extend it and override its cancel() for this purpose:


class ProduceWithResource<T>(
        actual: Subscriber<in T>,
        ctx : CoroutineContext
) : ProduceSubscription<T>(actual, ctx) {
    private val resource = AtomicReference<Subscription>()

    fun setResource(s: Subscription) {
        SubscriptionHelper.replace(resource, s)
    }

    override fun cancel() {
        SubscriptionHelper.cancel(resource)
        super.cancel()
    }
}


We simply use the deferred cancellation helper for Subscriptions.

Now let's see how we can prepare the context for running the coroutine inside the transform operator's subscribeActual() method:

    val ctx = newCoroutineContext(Unconfined)
    val parent = ProduceWithResource(s, ctx)
    s.onSubscribe(parent)
    source.subscribe(object: FlowableSubscriber {

        var upstream : Subscription? = null

        val wip = AtomicInteger()
        var error: Throwable? = null

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }
    })


First we create an unconfinded context where each invocation of the transformer coroutine will execute and suspend in. We create the producer that can hold an additional Subscription and send it to the downstream Subscriber. Finally, we subscribe to the upstream with a FlowableSubscriber.

In this custom FlowableSubscriber, we will have request from upstream, thus we save the Subscription we'll get from it. The wip and error fields will be used to achieve something similar to a half-serialization. I'll explain it once the methods are implemented.

Handling onSubscribe() is straightforward and typical for an RxJava operator:


    override fun onSubscribe(s: Subscription) {
        upstream = s
        parent.setResource(s)
        s.request(1)
    }


We store the upstream's subscription locally and in the ProducerWithResource to link up the cancellation across the operator. Then we request one item; this is partly due to simplifying the interaction between a suspended coroutine and the upstream producer. Using larger prefetch would require the use of some intermediate queue - possible, but left for the reader as an exercise. (Finally, we found a use for request(1)!)

Next, onNext():

    override fun onNext(t: T) {
        launch(ctx) {
           parent.setJob(coroutineContext[Job])

           wip.getAndIncrement()

           transformer(parent, t)

           if (wip.decrementAndGet() == 0) {
               upstream!!.request(1)
           } else {
               val ex = error;
               if (ex == null) {
                   s.onComplete()
               } else {
                   s.onError(ex)
               }
               parent.cancel()
           }
       }
    }

First, the Job of the actual coroutineContext has to be stored so a downstream cancellation can can call its Job.cancel() method. We have to do this because we will go in and out of the launch() when the upstream sends an item.

Next, the wip counter is incremented, which may seem odd. The reason for this is that if the transformer coroutine gets suspended, the execution returns to the caller of onNext(), a regular RxJava producer of some sorts. If this producer has reached its end, it will call onError or onComplete as these can be issued without request. As we'll see a bit later, forwarding these signals cuts out any pending emission from the suspended coroutine, therefore, we use the pattern of a half-serializer to save this terminal indication.

The transformer is executed with the parent ProducerWithResource instance that handles the suspendable onNext emissions towards the downstream.

Once the transformer's job has been done, the execution (resumes) with the atomic decrement of the wip counter. If it successfully decrements to 0, there was no terminal event signalled from the upstream while the transformer was suspended, thus we can request the next item from the upstream RxJava source.

The onError and onComplete are much simpler fortunately:


    override fun onError(t: Throwable) {
        error = t
        if (wip.getAndIncrement() == 0) {
            s.onError(t)
            parent.cancel()
        }
    }

    override fun onComplete() {
        if (wip.getAndIncrement() == 0) {
            s.onComplete()
            parent.cancel()
        }
    }


We store the Throwable (in onError only), then atomically increment the wip counter. If there was no ongoing coroutine, we are safe to emit the terminal event and cleanup/cancel the contextual Job we may still be referencing. If the original wip value was 1, the increment bumps it to 2 and the decrement in onNext() will detect the terminal condition and act accordingly.

Let's test it (by reusing the generator for fun)!

    f.transform({
        if (it % 2 == 0) {
            onNext(it)
        }
    })
    .test()
    .assertResult(0, 2, 4, 6, 8)

    f.transform({
        onNext(it)
        onNext(it + 1)
    })
    .test()
    .assertResult(0, 1, 1, 2, 2, 3, 3, 4, 4,
            5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10)

    f.transform({
        launch(CommonPool) {
            onNext(it + 1)
        }
    })
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


We can filter or amplify a source, synchronously or asynchronously if necessary with a single operator! Excellent!

The receiver

The last operation we'd do is, given a Flowable flow, we'd like to return to the coroutine world and consume the flow. For that, a ReceiverChannel seems to be appropriate output type as it can be for-each looped nicely.

Let's define the extension method toReceiver() with a skeleton as well:


suspend fun  Flowable<T>.toReceiver(capacityHint: Int = 128) : ReceiveChannel<T> {
    val queue = Channel<T>(capacityHint)

    val upstream = AtomicReference<Subscription>()
    val error = AtomicReference<Throwable>()
    val wip = AtomicInteger()

    subscribe(object: FlowableSubscriber<T> {

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

    })

    return // TODO implement
}


First, a Channel of type T and the given capacity is created. It is followed by the AtomicReference that will hold the source Flowable's Subscription, which will have to be linked up with the consumer to propagate cancellation. Next, since the upstream may signal terminal events while the channel is suspended in a send() we'll use - similar to the ProducerWithResource.onNext() situation, we will use the same AtomicInteger-based technique. The error AtomicReference will serve as the intermediary when handing over the terminal event to the channel.

Let's see the FlowableSubscriber implementation first:

        override fun onSubscribe(s: Subscription) {
            if (SubscriptionHelper.setOnce(upstream, s)) {
                s.request(1)
            }
        }

        override fun onNext(t: T) {
            launch (Unconfined) {
                wip.getAndIncrement()

                queue.send(t);

                if (wip.decrementAndGet() == 0) {
                    upstream.get().request(1)
                } else {
                    queue.cancel(error.get());
                }
            }
        }

        override fun onComplete() {
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel();
                }
            }
        }

        override fun onError(t: Throwable) {
            error.lazySet(t)
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel(t);
                }
            }
        }


The FlowableSubscriber implementation, practically, performs the same bookeeping as the transformer() operator did, with the exception that the closing of the channel has to happen in a launch-provided context.

However, this is only the producer half of the channel, we still need the consumer part, more specifically, the consumer-reemitter. Luckily, the build in produce() operator of the Coroutines library help with it. Why not return the channel directly? Because we need a way to detect if the channel is closed from the consumer's end and Channel doesn't allow us to register a completion handler for it. However, the Job inside the coroutineContext of produce() does:

    return produce(Unconfined) {
        coroutineContext[Job]?.invokeOnCompletion { 
            SubscriptionHelper.cancel(upstream) 
        }

        for (v in queue) send(v)
    }


Let's test this last operator:

runBlocking {
    for (i in f.toReceiver()) {
         println(i)
    }
    println("Done")

    for (i in f.subscribeOn(Schedulers.single()).toReceiver()) {
         println("Async $i")
    }
    println("Async Done")
}


Well done!

Conclusion

In this blog post, I demonstrated how one can write three operators, produceFlow, transform and toReceiver, that can interoperate with RxJava's own, backpressure enabled Flowable type reasonably well.

This should prove that both technologies, at the end, can be combined by the developer as seen fit for the target domain or business requirements.

This was somewhat a heated week for me so for now, until something interesting comes up in this topic, me writing about Kotlin Coroutines will be ... suspended.

Java 9 Flow API: ordered merge

Introduction

Sometimes, one has several ordered sequences of events and would like to merge them into one single flow. Since one element from a sequence should come before another element in another sequence, we need a way to keep comparing elements with each other from different sequences.

Unfortunately, zip() doesn't work because it takes a row of available items and item #2 from sequence #2 may come before item #1 from stream #3. Plus, if one stream is shorter than the others, the end sequence stops. Similarly, flatMap() doesn't work because it takes the next item from any inner source sequence the moment it is available without any ordering considerations at that point. At least it emits all items from all sources (provided there are no errors of course).

Therefore, we need something between the two operators: one that collects up a row of items from the sources, decides which is the smallest/largest of them based on some comparison logic and only emits that. It then awaits a fresh item from that specific source (or completion) and repeats the picking of the smallest/largest item as long as there are requests for it.

Such operator, let's call it orderedMerge(), has an implication about the number of its inner source sequences: it has to be fixed. The reason for it is that it has to pick the smallest/largest of the available items in order for the output to be in order. If there is still a source missing, it can't know for sure the others are smaller/larger that any of the upcoming item from that missing source will produce.

The second implication is, what happens if the sources themselves are not ordered? The logic presented in this post still works, but the end output won't be totally ordered. It will act like some priority queue instead: picking important items first before turning to less important ones.


The inner consumer

Operators handling multiple sources often need a way to prefetch item from these sources and give out them on demand to some joining logic. This mainly happens by prefetching a fixed amount, putting items in a queue, calling the parent coordinator's drain() method, and batching out replenishing calls from the coordinator if the so-called stable-prefetch backpressure is required.

For this purpose, let's see how the inner consumer, OrderedMergeInnerSubscriber, of orderedMerge() could look like:


static final class OrderedMergeInnerSubscriber<T> 
extends AtomicReference<Flow.Subscription>
implements Flow.Subscriber<T>, Flow.Subscription {

    final OrderedMergeCoordinator<T> parent;

    final int prefetch;

    final int limit;

    final Queue<T> queue;

    int consumed;

    volatile boolean done;

    OrderedMergeInnerSubscriber(
        OrderedMergeCoordinator<T> parent,
        int prefetch
    ) {
        this.parent = parent;
        this.prefetch = prefetch;
        this.limit = prefetch - (prefetch >> 2);
        this.queue = new ConcurrentLinkedQueue<>()
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }
}

We'll need a reference to the coordinator of the operator, the prefetch amount that we will use the 75% of for replenishing requests, a queue - ConcurrentLinkedQueue for simplicity, but bounded SpscArrayQueue from JCTools works here as well, and a counter of how many items have been consumed so far to know when to replenish.


    @Override
    public void onSubscribe(Flow.Subscription s) {
        if (compareAndSet(null, s)) {
            s.request(prefetch);
        } else {
            s.cancel();
        }
    }

    @Override
    public void onNext(T item) {
        queue.offer(item);
        parent.drain();
    }

    @Override
    public void onError(Throwable throwable) {
        parent.onInnerError(this, throwable);
    }

    @Override
    public void onComplete() {
        done = true;
        parent.drain();
    }

    @Override
    public void request(long n) {
        int c = consumed + 1;
        if (c == limit) {
            consumed = 0;
            Flow.Subscription s = get();
            if (s != this) {
                s.request(c);
            }
        } else {
            consumed = c;
        }
    }

    @Override
    public void cancel() {
        Flow.Subscription s = getAndSet(this);
        if (s != null && s != this) {
            s.cancel();
        }
    }


I'd say there is nothing too complicated here.

  • onSubscribe() saves the upstream Flow.Subscription in the AtomicReference the operator extends if not already cancelled. If successful, the prefetch amount is requested.
  • onNext() stores the item in the queue and calls drain() on parent to handle that case.
  • onError() defers the error signal to be handled by the parent coordinator: the parent may save up the errors or cancel the whole flow at once.
  • onComplete() sets the complete indicator, which tells the parent this particular source will not produce more values and thus can be skipped when looking for the next smallest/largest items to emit
  • request() will only be called by the parent to replenish one item from its perspective once the previous item has been successfully chosen as the next item to be emitted towards downstream. Since replenishing one by one is costly, we batch up those via the consumed counter. Once that counter reaches the limit (75% of prefetch), a request is issued to the upstream. Since the AtomicReference will hold itself as a cancellation indicator, we don't want to call request on ourselves. It's important to state that request() will be guaranteed to be called from one thread at a time by the virtue of the queue-drain approach the coordinator implements below.
  • cancel() atomically swaps in the this as a terminal indicator and cancels the non-null, non-this Flow.Subscription if present.

The coordinator

Since there is no primary source in this orderedMerge() operator, it acts somewhat like a plain source of events. Therefore, we have to implement it on top of the Flow.Subscription to interact with the downstream. For convenience for this blog, we'll define the operator to take a variable number of Flow.Publisher sources (which at runtime ways ends up a fixed-size array):


@SafeVarargs
public static <T> Flow.Publisher<T> orderedMerge(
        Comparator<? super T> comparator, 
        int prefetch,
        Flow.Publisher<? extends T>... sources) {
    return new OrderedMergePublisher<>(sources, prefetch, comparator);
}

final class OrderedMergePublisher<T> implements Flow.Publisher<T> {

    final Flow.Publisher<? extends T>[] sources;

    final int prefetch;

    final Comparator<? super T> comparator;

    OrderedMergePublisher(
            Flow.Publisher<? extends T>[] sources,
            int prefetch,
            Comparator<? super T> comparator) {
        this.sources = sources;
        this.prefetch = prefetch;
        this.comparator = comparator;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> s) {
         // TODO implement
    }
}


The boilerplate of writing an operator is nothing special: save up on the parameters to be used by the implementation. We allow customization via a Comparator interface. If T is self-comparable, you can use Comparators.naturalOrder() from Java itself.

The coordinator implementation has to hold onto the inner OrderedMergeInnerSubscribers for mass cancellation support, subscribe them to the sources and work out the emissions from them. Let's see the non-exciting parts of it:

static final class OrderedMergeSubscription<T>
extends AtomicInteger implements Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    final OrderedMergeInnerSubscriber<T>[] subscribers;

    final Comparator<? super T> comparator;

    final Object[] values;

    static final Object DONE = new Object();

    Throwable error;

    boolean cancelled;

    long requested;

    long emitted;

    // -------------------------------------------------

    static final VarHandle ERROR;
 
    static final VarHandle DONE;
 
    static final VarHandle CANCELLED;
 
    static final VarHandle REQUESTED;

    static {
        Lookup lk = MethodHandles.lookup();
        try {
            ERROR = lk.findVarHandle(
                OrderedMergeSubscription.class, "error", Throwable.class);
            CANCELLED = lk.findVarHandle(
                OrderedMergeSubscription.class, "cancelled", boolean.class);
            REQUESTED = lk.findVarHandle(
                OrderedMergeSubscription.class, "requested", long.class);
        } catch (Throwable ex) {
            throw new InternalError(ex);
        }
    }

    OrderedMergeSubscription(
            Flow.Subscriber<? super T> downstream,
            Comparator<? super T> comparator,
            int prefetch,
            int n) {
        this.downstream = downstream;
        this.comparator = comparator;
        this.subscribers = new OrderedMergeInnerSubscriber[n];
        for (int i = 0; i < n; i++) {
             this.subscriber[i] = new OrderedMergeInnerSubscriber<>(this, prefetch);
        }
        this.values = new Object[n];
    }

    void subscribe(Flow.Publisher<? extends T>[] sources) {
        // TODO implement
    }
     
    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }

    void onInnerError(OrderedMergeInnerSubscriber<T> sender, Throwable ex) {
        // TODO implement
    }

    void updateError(Throwable ex) {
        // TODO implement
    }
}

We have a couple of fields and methods here, some should be familiar in its naming and intended purpose:


  • downstream will receive the ordered sequence of items
  • subscribers holds onto the fixed set of inner OrderedMergeInnerSubscribers, each will be subscribed to a particular Flow.Publisher and the total number of them won't ever change in this operator.
  • comparator will compare elements from various sources
  • values holds onto the next available value from each source. This allows the merging algorithm to work with queues that don't support peek() (such as RxJava 2's on queue implementations) and otherwise has nice properties such as locality, avoiding accessing internals of the inner subscribers' queues and the overhead of a peek()-poll() pair all the time.
  • The DONE constant will indicate a particular source has no further elements and can be ignored (without looking at its subscriber).
  • error will gather the errors signalled by the sources and emitted together once all sources terminated. There is an ERROR VarHandle for concurrent access to this field.
  • cancelled indicates the downstream has issued a cancel() call to stop the flow. The CANCELLED VarHandle will allow us to use compareAndSet() to cancel at most once.
  • requested accumulates the requests done by the downstream via its REQUESTED VarHandle.
  • emitted counts how many items were emitted and will be compared against requested to detect when to pause emitting.
There is no separate done indicator field because we will deduce this state by detecting that all values items are marked as DONE.

Now let's see the shorter methods implemented:


    // ...

    void subscribe(Flow.Publisher<? extends T>[] sources) {
        for (int i = 0; i < sources.length; i++) {
            sources[i].subscribe(subscribers[i]);
        }
    }
     
    @Override
    public void request(long n) {
        if (n <= 0L) {
            updateError(new IllegalArgumentException("non-negative request expected"));
        } else {
            for (;;) {
                long current = (long)REQUESTED.getAcquire(this);
                long next = current + n;
                if (next < 0L) {
                    next = Long.MAX_VALUE;
                }
                if (REQUESTED.compareAndSet(this, current, next)) {
                    break;
                }
            }
        }
        drain();
    }

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
                inner.cancel();
            }

            if (getAndIncrement() == 0) {
                Arrays.fill(values, null);

                for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
                    inner.queue.clear();
                }
            }
        }
    }

    void onInnerError(OrderedMergeInnerSubscriber<T> sender, Throwable ex) {
        update(ex);
        sender.done = true;
        drain();
    }

    void updateError(Throwable ex) {
        for (;;) {
            Throwable current = (Throwable)ERROR.getAcquire(this);
            Throwable next;
            if (current == null) {
                next = throwable;
            } else {
                next = new Throwable();
                next.addSuppressed(current);
                next.addSuppressed(throwable);
            }
            if (ERROR.compareAndSet(this, current, next)) {
                break;
            }
        }
    }

    void drain() {
        // TODO implement
    }

}


The subscribe() method simply subscribes to all sources with the prepared array of OrderedMergeInnerSubscribers. The cancel() method cancels all inner subscribers and then enters a half-open drain mode where both the values array and each queue of the inner subscribers is cleared in order to help the GC. Both request() and updateError() should be familiar from the previous post of the series.

What's left is the drain() logic itself.


void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    Flow.Subscriber<? super T> downstream = this.downstream;

    Comparator<? super T> comparator = comparator;

    OrderedMergeInnerSubscriber<T>[] subscribers = this.subscribers;
    int n = subscribers.length;

    Object[] values = this.values;

    long e = emitted;

    for (;;) {
         long r = (long)REQUESTED.getAcquire(this);

         for (;;) {
              // TODO implement
         }

         emitted = e;
         missed = addAndGet(-missed);
         if (missed == 0) {
             break;
         }
    }
}


We start out with the usual drain-exclusion logic: transitioning the work-in-progress counter of this (by extending AtomicInteger) from zero to one allows one thread to enter and perform the emissions. We load the frequently accessed components into local fields and do an almost-classical for-loop with missed accounting to determine when to leave the loop.

Note that in the loop, after reading the current request amount we don't have the usual while (e != r) and if (e == r) cases. The reason for this is that we can have one shared loop for both the cases when backpressure is applied and when there are no further source items to merge and can terminate the sequence without a request from downstream.


// inner for(;;) {

if ((boolean)CANCELLED.getAcquire(this)) {
    Arrays.fill(values, null);

    for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
        inner.queue.clear();
    }
    return;
}

int done = 0;
int nonEmpty = 0;
for (int i = 0; i < n; i++) {
    Object o = values[i]
    if (o == DONE) {
        done++;
        nonEmpty++;
    } else
    if (o == null) {
        boolean innerDone = subscribers[i].done;
        o = subscribers[i].queue.poll();
        if (o != null) {
            values[i] = o;
            nonEmpty++;
        } else if (innerDone) {
            values[i] = DONE;
            done++;
            nonEmpty++;
        }
    } else {
        nonEmpty++;
    }
}


The first part is check if there was a cancellation from downstream. If so, we clear the internal state of the coordinator and each queue then quit. Next for each subscriber of a source, we have to poll the next available item into the common values array if there is not already an item available there. In addition, we account how many of those items indicate a completed source and how many of them has actual items.

Note that checking a source for completion has to happen before polling for the next item from its queue. As explained before on this blog, since Flow.Subscriber methods are invoked in a strict protocol order where an onComplete always happens after any previous onNext calls, if we detect done to be true and then get a null from the queue, we know there can't be any further items from that source. Otherwise, polling and seeing an empty queue first then checking done opens a window when the source quickly produces items and completes between these two checks.

Next, we handle the overall state of the operator:

if (done == n) {
    Throwable ex = (Throwable)ERROR.getAcquire(this);
    if (ex == null) {
        downstream.onComplete();
    } else {
        downstream.onError(ex);
    }
    return;
}

if (nonEmpty != n || e != r) {
    break;
}


If all of the elements turn out to be DONE, that means we exhausted all sources and can terminate the downstream accordingly (considering if there was any error along the way). If not all values slot have something or the downstream is not ready to receive an item, we break out this inner look and the outer loop will see if more work has to be done or not.

Finally, we find the smallest item from the available values:


    T min = null;
    int minIndex = -1;

    int i = 0;
    for (Object o : values) {
        if (o != DONE) {
            if (min == null || comparator.compare(min, (T)o) > 0) {
                min = (T)o;
                minIndex = i;
            }      
        }
        i++;
    }

    values[minIndex] = null;

    downstream.onNext(min);

    e++;
    subscribers[minIndex].request(1);

} // of the inner for (;;)


Once we know there we can emit an item, we'll find the smallest one among the non-DONE entries along with its index. Since we checked that not all entries are DONE before, min must end up non-null and minIndex non-negative. We clear the appropriate values entry indicating the next cycle should poll for more items from that particular source, we emit the found minimum item, increment the emission counter and signal the winning source to produce one more item.

Conclusion

The orderedMerge() operator shown in this post is perhaps one of the shortest and more comprehensible among the other ones, even if considering the lack of infrastructure with Java 9 Flows (i.e., cancelled indicator). The queue-drain approach present in many of the typical reactive operators can be observed here as well.

Since the operator collects you a row of values available, it can be relatively easily turned into a zip() operator:


  • done != 0 indicates one or more sources run out of items thus the sequence can be completed. Note that the non-done inner subscribers have to be cancelled and cleared before the downstream gets the terminal event.
  • instead of the loop that compares items, one copies the values array, clears the original one, applies a function to the copy and emit the result of that function call.


You can also turn it into a (rather clumsy) merge() operator that uses a round-robin collection strategy to pick the item to be emitted: an index (also saved into a field for subsequent drain rounds) that indicates which next slot to consider for emission if nonEmpty != 0, skipping over the DONE entries along the way.

However, there is one likely problem that troubles such value-collecting-and-emitting operators; it does - what Stephane Maldini, Project Reactor lead, once called our RxJava 2 / Reactor-Core 3 algorithms do - thread-stealing. Given all the sources, one of them will be doing the collecting and emitting the smallest item for all the other sources while that thread itself likely won't make any progress unless it finds a small pause in the onslaught of source items so the drain loop can quit.

This may be undesirable at times and there is a solution for it. To get there, we will investigate how thread switching in mid-flow can be implemented within the Java 9 Flow API in the next post.

2017. szeptember 9., szombat

Rewriting RxJava with Kotlin Coroutines?

Introduction


Someone influential stated that RxJava should be rewritten with Kotlin Coroutines. I haven't seen any attempt of it as of now and declaring such a thing to be (not) worth without actually trying is irresponsive.

As we saw in the earlier post and the response in the comment section, following up on the imperative-reactive promise leads to some boilerplate and questionable cancellation management, and the idiomatic Kotlin/Coroutine enhancement suggested is to ... factor out the imperative control structures into common routines and have the user specify lambda callback(s); thus it can become declarative-reactive, just like RxJava interpreted from a higher level viewpoint. Kind of defeats one of the premises in my understanding.

This doesn't diminish the power of coroutine-based abstraction but certainly implies a relevant question: who is supposed to write these abstract operators?

One possible answer is, of course, library writers who not only have experience with abstracting away control structures but perhaps wield deeper knowledge about how the coroutine infrastructure can be utilized in certain complicated situations.

If this assumption of mine is true, that somewhat defeats another premise of coroutines: the end user will likely have to stick to writing suspendable functionals and discover operators provided by a library most of the time.

So what's mainly left is to see if implementing a declarative-reactive library on top of coroutines gives benefits to the library developer (i.e., ease of writing) over hand crafted state-machines and (reasonable) performance to the user of the library itself.

The library implementation


Perhaps one of the more attractive properties of RxJava is the deferred lazy execution of a reactive flow (cold). One sets up a template of transformations and issues a subscribe() call to begin execution. In contrast, CompletableFuture and imperative Coroutines can be thought as eager executions - in order to retry them one has to recreate the whole chain, plus their execution may be ongoing while one still is busy applying operators on top of them.

Base interfaces


Since the former structure is more enabling at little to no overhead, we'll define our base types as follows:


interface CoFlow<out T> {
    suspend fun subscribe(consumer: CoConsumer<T>)
}


The main interface, CoFlow, matches the usual pattern of the Reactive-Streams Publisher.

interface CoConsumer<in T> {

    suspend fun onSubscribe(connection: CoConnection)

    suspend fun onNext(t: T)

    suspend fun onError(t: Throwable)

    suspend fun onComplete()
}


The consumer type, CoConsumer, is also matching the Reactive-Streams Subscriber pattern.

interface CoConnection {
    suspend fun close()
}


The final type, CoConnection, is responsible for cancelling a flow. Unlike the Reactive-Streams Subscription, there is no request() method because we will follow up on the non-blocking suspension promise of the coroutines: the sender will be suspended if the receiver is not in the position to receive, thus there should be no need for request accounting as the state machine generated by the compiler will implicitly do it for us.

Those with deeper understanding of how cancellation works with coroutines may object to this connection object. Indeed, there are probably better ways of including cancellation support, however, my limited understanding of the coroutine infrastructure didn't yield any apparent concept-match between the two. Suggestions welcome.

Entering the CoFlow world

Perhaps the most basic way of creating a flow of values is the Just(T) operator that when subscribed to, emits its single item followed by a completion signal. Since we don't have to deal with a backpressure state machine, this should be relatively short to write:


class Just<out T>(private val value: T) {
    override suspend fun subscribe(consumer: CoConsumer<T>) {
        consumer.onSubscribe(???)
        consumer.onNext(value)
        consumer.onComplete()
    }
}

In order to allow the downstream to indicate cancellation, we have to send something along onSubscribe. Since coroutines appear as synchronous execution, we would have the same synchronous cancellation problem that the Reactive-Streams Subscription (and RxJava before it) solves: inversion of control by sending down something cancellable first, then checking if the consumer had enough.


class BooleanConnection : CoConnection {

   @Volatile var cancelled : Boolean = false

   override suspend fun close() {
       cancelled = true
   }
}


Which we now can use with Just(T):

class Just<out T>(private val value: T) {
    override suspend fun subscribe(consumer: CoConsumer<T>) {
        val conn = BooleanConnection()
        consumer.onSubscribe(conn)

        if (conn.cancelled) {
            return
        }
        consumer.onNext(value)

        if (conn.cancelled) {
            return
        }
        consumer.onComplete()
    }
}

Since everything is declared suspend, we should have no problem interacting with an operator downstream that suspends execution in case of an immediate backpressure.

Let's see a source that emits multiple items, but for an (expectable) twist, we implement an uncommon source: Chars(String) which emits the characters of a string as Ints:


class Chars(private val string: String) : CoFlow<Int> {
    override suspend fun subscribe(consumer: CoConsumer<Int>) {
        val conn = BooleanConnection()
        consumer.onSubscribe(conn)
  
        for (v in 0 until string.length) {
            if (conn.cancelled) {
                return
            }
            consumer.onNext(v.asInt())
        }
        if (conn.cancelled) {
            return
        }
        consumer.onComplete()
    }
}

And lastly for this subsection, we will implement FromIterable(T):


class FromIterable<T>(private val source: Iterable<T>) : CoFlow<T> {
    override suspend fun subscribe(consumer: CoConsumer<T>) {
        val conn = BooleanConnection()
        consumer.onSubscribe(conn)
  
        for (v in source) {
            if (conn.cancelled) {
                return
            }
            consumer.onNext(v)
        }
        if (conn.cancelled) {
            return
        }
        consumer.onComplete()
    }
}


So far, these sources look pretty much like how the non-backpressured RxJava 2 Observable is implemented. I'm sure there are more concise way of expressing them; I have, unfortunately, only limited knowledge about Kotlin's syntax improvements over Java, however, since the blog's audience I think is mainly Java programmers, something familiar looking should be "less alien" at this point.

Transformations

What is the most common transformation in the reactive world? Mapping of course! Therefore, let's see how the instance extension method Map(T -> R) looks like.


suspend fun <T, R> CoFlow<T>.map(mapper: suspend (T) -> R): CoFlow<R> {
    val source = this
    
    return object: CoFlow<R> {
        override suspend fun subscribe(consumer: CoConsumer<R>) {

            source.subscribe(object: CoConsumer<T> {

                var upstream: CoConnection? = null
                var done: Boolean = false

                override suspend fun onSubscribe(conn: CoConnection) {
                    upstream = conn
                    consumer.onSubscribe(conn)
                }

                override suspend fun onNext(t: T) {
                    val v: R;
                    try {
                        v = mapper(t)
                    } catch (ex: Throwable) {
                        done = true
                        upstream!!.close()
                        consumer.onError(ex)
                        return
                    }
                    consumer.onNext(v)
                }

                override suspend fun onError(t: Throwable) {
                    if (!done) {
                        consumer.onError(t)
                    }
                }

                override suspend fun onComplete() {
                    if (!done) {
                        consumer.onComplete()
                    }
                }
            })
        }
    }
}

Perhaps what I most envy of Kotlin is the extension method support. I can only hope for it in Java now that Oracle switches to a 6 months feature enhancement cycle. The val source = this may seem odd to a Kotlin developer; maybe there is a syntax for it so that the outer this may be accessible from the anonymous inner class (object: CoFlow<R>) in some other way. Note also the suspend (T) -> R signature: we will, of course, mainly support suspendable functions.

The logic, again, resembles of RxJava's own map() implementation. We save and forward the upstream connection instance to the consumer as there is no real need to intercept the close call. We apply the upstreams value to the mapper function and forward the result to the consumer. If the mapper function crashes, we stop the upstream and emit the error. This may happen for the very last item and the upstream may still emit a regular onComplete(), which should be avoided just like with Reactive-Streams.

The next common operator is Filter(T):


suspend fun <T> CoFlow<T>.filter(predicate: suspend (T) -> Boolean): CoFlow<T> {
    val source = this
    
    return object: CoFlow<T> {
        override suspend fun subscribe(consumer: CoConsumer<R>) {
            source.subscribe(object: CoConsumer<T> {

                var upstream: CoConnection? = null
                var done: Boolean = false

                override suspend fun onSubscribe(conn: CoConnection) {
                    upstream = conn
                    consumer.onSubscribe(conn)
                }

                override suspend fun onNext(t: T) {
                    val v: Boolean;
                    try {
                        v = predicate(t)
                    } catch (ex: Throwable) {
                        done = true
                        upstream!!.close()
                        consumer.onError(ex)
                        return
                    }
                    if (v) {
                        consumer.onNext(t)
                    }
                }

                override suspend fun onError(t: Throwable) {
                    if (!done) {
                        consumer.onError(t)
                    }
                }

                override suspend fun onComplete() {
                    if (!done) {
                        consumer.onComplete()
                    }
                }
            })
        }
    }
}

I guess the pattern is now obvious. Let's see a couple of other operators.

Take

suspend fun <T> CoFlow<T>.take(n: Long): CoFlow<T> {

// ...

     var remaining = n

     override suspend fun onNext(t: T) {
         val r = remaining
         if (r != 0L) {
             remaining = --r;
             consumer.onNext(t)
             if (r == 0L) {
                 upstream!!.close()
                 consumer.onComplete()
             }
         }
     }

// ...

     override suspend fun onComplete() {
         if (remaining != 0L) {
             consumer.onComplete()
         }
     }
}

Skip


suspend fun <T> CoFlow<T>.skip(n: Long): CoFlow<T> {

// ...

     var remaining = n

     override suspend fun onNext(t: T) {
         val r = remaining
         if (r == 0L) {
             consumer.onNext(t)
         } else {
             remaining = r - 1
         }
     }

     // ...
}

Collect


suspend fun <T, R> CoFlow<T>.collect(
         collectionSupplier: suspend () -> R,
         collector: suspend (R, T) -> Unit
): CoFlow<R> {
    val source = this
    
    return object: CoFlow<R> {

        override suspend fun subscribe(consumer: CoConsumer<R>) {

            val coll : R

            try {
                coll = collectionSupplier()
            } catch (ex: Throwable) {
                consumer.onSubscribe(BooleanConnection())
                consumer.onError(ex)
                return
            }                     

            source.subscribe(object: CoConsumer<T> {

                var upstream: CoConnection? = null
                var done: Boolean = false
                val collection: R = coll

                override suspend fun onSubscribe(conn: CoConnection) {
                    upstream = conn
                    consumer.onSubscribe(conn)
                }

                override suspend fun onNext(t: T) {
                    try {
                        collector(collection, t)
                    } catch (ex: Throwable) {
                        done = true
                        upstream!!.close()
                        consumer.onError(ex)
                        return
                    }
                }

                override suspend fun onError(t: Throwable) {
                    if (!done) {
                        consumer.onError(t)
                    }
                }

                override suspend fun onComplete() {
                    if (!done) {
                        consumer.onNext(collection)
                        consumer.onComplete()
                    }
                }
            })
         
        }
    }
}


Sum


suspend fun <T: Number> CoFlow<T>.sumInt(): CoFlow<Int> {


    // ...
    var sum: Int = 0
    var hasValue: Boolean = false

    override suspend fun onNext(t: T) {
        if (!hasValue) {
            hasValue = true
        }
        sum += t.toInt()
    }

    // ...

    override suspend fun onComplete() {
        if (hasValue) {
            consumer.onNext(sum)
        }
        consumer.onComplete()
    }
}

Max


suspend fun <T: Comparable<T>> CoFlow<T>.max(): CoFlow<T> {

    // ...
    var value: T? = null

    override suspend fun onNext(t: T) {
        val v = value
        if (v == null || v < t) {
            value = t
        }               
    }

    // ...

    override suspend fun onComplete() {
        val v = value
        if (v != null) {
            consumer.onNext(v)
        }
        consumer.onComplete()
    }
}

Flatten


suspend fun <T, R> CoFlow<T>.flatten(mapper: suspend (T) -> Iterable<R>): CoFlow<R> {

    // ...

    override suspend fun onNext(t: T) {

        try {
            for (v in mapper(t)) {
                consumer.onNext(v)
            }
        } catch (ex: Throwable) {
            done = true
            upstream!!.close()
            consumer.onError(ex)
            return
        }
    }

}

Concat


suspend fun <T, R> CoFlow<T>.concat(vararg sources: CoFlow<T>): CoFlow<T> {
    return object: CoFlow<T> {
        suspend override fun subscribe(consumer: CoConsumer<T>) {
            val closeToken = SequentialConnection()
            consumer.onSubscribe(closeToken)
            launch(Unconfined) {
                val ch = Channel<Unit>(1);

                for (source in sources) {

                    source.subscribe(object: CoConsumer<T> {
                        suspend override fun onSubscribe(conn: CoConnection) {
                            closeToken.replace(conn)
                        }

                        suspend override fun onNext(t: T) {
                            consumer.onNext(t)
                        }

                        suspend override fun onError(t: Throwable) {
                            consumer.onError(t)
                            ch.close()
                        }

                        suspend override fun onComplete() {
                            ch.send(Unit)
                        }

                    })

                    try {
                        ch.receive()
                    } catch (ex: Throwable) {
                        // ignored
                        return@launch
                    }
                }

                consumer.onComplete()
            }
        }
    }
}


Before concat, we did not have to interact with the cancellation mechanism of the coroutine world. Here, if one wants to avoid unbounded recursion due to switching to the next source, some trampolining is necessary. The launch(Unconfined), as I understand it, should do just that. Note that the returned Job is not joined into the CoConnection rail, partly due to avoid writing a CompositeCoConnection, partly because I don't know how generally such contextual component should interact with our CoFlow setup. Suggestions welcome.

As for the use of Channel(1), I encountered two problems:

  • I don't know how to hold off the loop otherwise as suspendCoroutine { } doesn't allow its block to be suspendable and we have subscribe() as suspendable.
  • The plain Channel() is a so-called rendezvous primitive where send() and receive() have to meet. Unfortunately, a synchronously executed CoFlow will livelock because send() suspends - because there is no matching receive() call on the same thread - which would resume receive(). A one element channel solved this.


The (simpler) SequentialConnection is implemented as follows:


class SequentialConnection : AtomicReference<CoConnection?>(), CoConnection {

    object Disconnected : CoConnection {
        suspend override fun close() {
        }
    }

    suspend fun replace(conn: CoConnection?) : Boolean {
        while (true) {
            val a = get()
            if (a == Disconnected) {
                conn?.close()
                return false
            }
            if (compareAndSet(a, conn)) {
                return true
            }
        }
    }

    suspend override fun close() {
        getAndSet(Disconnected)?.close()
    }
}

It uses the same atomics logic as the SequentialDisposable in RxJava.

Leaving the reactive world

Eventually, we'd like to return to the plain coroutine world and resume our imperative code section after a CoFlow has run. One case is to actually ignore any emission and just wait for the CoFlow to terminate. Let's write an await() operator for that:


suspend fun <T> CoFlow<T>.await() {
    val source = this

    val ch = Channel<T>(1)

    source.subscribe(object : CoConsumer<T> {
        var upstream : CoConnection? = null

        suspend override fun onSubscribe(conn: CoConnection) {
            upstream = conn
        }

        suspend override fun onNext(t: T) {
        }

        suspend override fun onError(t: Throwable) {
            ch.close(t)
        }

        suspend override fun onComplete() {
            ch.close()
        }
    })

    try {
        ch.receive()
    } catch (ex: ClosedReceiveChannelException) {
        // expected closing
    }
}

The same Channel(1) trick is used here. Again, I don't know how to attach the CoConnection to the caller's context.

Sometimes, we are interested in the first or last item generated through the CoFlow. Let's see how to get to the first item via an awaitFirst():


suspend fun <T> CoFlow<T>.awaitFirst() : T {
    val source = this

    val ch = Channel<T>(1)

    source.subscribe(object : CoConsumer<T> {
        var upstream : CoConnection? = null
        var done : Boolean = false

        suspend override fun onSubscribe(conn: CoConnection) {
            upstream = conn
        }

        suspend override fun onNext(t: T) {
            done = true
            upstream!!.close()
            ch.send(t)
        }

        suspend override fun onError(t: Throwable) {
            if (!done) {
                ch.close(t)
            }
        }

        suspend override fun onComplete() {
            if (!done) {
                ch.close(NoSuchElementException())
            }
        }
    })

    return ch.receive()
}


The benchmark


Since benchmarking concurrent performance would be somewhat unfair at this point, the next best benchmark I can think of is our standard Shakespeare Plays Scrabble. It can show the infrastructure overhead of a solution without any explicitly stated concurrency need from the solution.

Rather than showing the somewhat long Kotlin source code adapted for CoFlow, you can find the benchmark code in my repository. The environment: i7 4770K, Windows 7 x64, Java 8u144, Kotlin 1.1.4-3, Coroutines 0.18, RxJava 2.1.3 for comparison:

    RxJava Flowable: 26 milliseconds / op
    Coroutines CoFlow: 52.4 milliseconds / op

Not bad for the first try with limited knowledge. I can only speculate about a source of the 2x slower CoFlow implementation: Channel. I'm not sure it meant to support multiple senders and multiple receives, thus the internal queue is involved in way more atomics operation than necessary for our single-producer-single-consumer CoFlow/Reactive-Streams architecture.

Conclusion


As demonstrated, it is possible to rewrite (a set of) RxJava operators with coroutines and depending on the use case, even this (unoptimized) 2x overhead could be acceptable. Does this mean the rest of the 180 operators can be (reasonably) well translated?

I don't know yet; flatMap(), groupBy() and window() are the most notoriously difficult operators due to the increased concurrency and backpressure interaction:


  • flatMap has to manage a dynamic set of sources which each have to be backpressured. Should each of them use the same Channel.send() or go round robin in some way?
  • groupBy is prone to livelock if the groups as whole and individually are not consumed.
  • window has a pecuilar operation mode (true for groupBy) that if one takes one window only, the upstream should not be cancelled until items aimed at that window have been emitted by the upstream or the consumption of the window is cancelled.

Can RxJava be ported to Kotlin Coroutines: yes. Should the next RxJava rather be written in Kotlin Coroutines: I don't think so. The reasons I'm still not for "Coroutines everywhere" despite all the code shown in this post are:

  • I had to do this porting myself, which hardly constitutes as an unbiased and independent verification.
  • The coroutine concept is great, but tied to Kotlin as a compiler and its standard library. What should happen with the non-Kotlin, non-Android reactive users? What about other JVM languages?
  • Building the state machine is hidden from the developer by the compiler. There is always the risk the compiler doesn't do reasonable optimization job and/or doesn't introduce certain bugs you can't workaround easily from the user level. How often is the Kotlin language/standard library updated to fix issues? How is that SAM issue doing?

Solving problems developers face is great, hyping about "burrying Reactive programming as obsolete" without supporting evindence is not.