2015. június 12., péntek

Schedulers (part 3)

Introduction

In this blog post, I'm going to talk about the case when one wants to wrap an existing multi-threaded Executor and ensure the contracts on Scheduler and Worker are enforced in some manner.

The most important contract on Worker is that tasks submitted sequentially have to execute in the same order, but threads in the Executor would pick them up in random fashion and execute them out of order and in parallel.

The solution is to use a form of the queue-drain approach we saw many times and perform a trampolining of the scheduled actions. The queue will make sure the order is of the actions is kept and the drain logic will make sure only one drain action is running at a time so parallel execution is avoided.

The ExecutorScheduler

As  usual, we start with the skeleton of the class:

public final class ExecutorScheduler extends Scheduler {
    final Executor exec;
    public ExecutorScheduler(Executor exec) {
        this.exec = exec;
    }
    @Override
    public Worker createWorker() {
        return new ExecutorWorker();
    }
    
    final class ExecutorWorker extends Worker 
    implements Runnable {                             // (1)
        // data fields here
        @Override
        public Subscription schedule(
                Action0 action) {
            // implement
        }
        @Override
        public Subscription schedule(
                Action0 action, long delayTime,
                TimeUnit unit) {
            // implement
        }
        @Override
        public void run() {
            // implement
        }
        @Override
        public boolean isUnsubscribed() {
            // implement
        }
        @Override
        public void unsubscribe() {
            // implement
        }
    }
}

All worker instances will delegate the scheduling actions to the same underlying Executor. Interesting to note that if we make ExecutorWorker implement Runnable, we can save on the creation of a separate Runnable that is needed in the drain phase.

The queue-drain logic requires a queue and a work-in-progress indicator and the worker requires a tracking subscription container to support mass-unsubscription:

        // ...
        final AtomicInteger wip = new AtomicInteger();
        final Queue<ScheduledAction> queue = new ConcurrentLinkedQueue<>();
        final CompositeSubscription tracking = new CompositeSubscription();
        // ...

The use of ConcurrentLinkedQueue may trigger the performance enthusiasts (like me), since it would appear one only needs either an MpscLinkedQueue from JCTools, for example, or even my MpscLinkedArrayQueue only.

However, there is a tradeoff here: are we willing to pay the cost of cancelled task retention or not. This wasn't an issue with the standard Schedulers backed by ScheduledExecutorService instances because they removed cancelled task instances from their internal queue automatically (or a periodic purge does it in Java 6 environment). Such removal option is not available with JCTools or my queue, therefore, the best option, for now, is to use ConcurrentLinkedQueue when task retention is an issue. (One can, of course, implement a specialized queue and task object that know about each other so the unsubscription can locate the task itself inside the queue and CAS it out with a tombstone task.) Note however, that the removal will be an O(n) per task.

Given the base data fields, let's implement the easier methods of the ExecutorWorker:

        // ...
        @Override
        public boolean isUnsubscribed() {
            return tracking.isUnsubscribed();
        }
        @Override
        public void unsubscribe() {
            queue.clear();
            tracking.unsubscribe();
        }
    }
}

Note that since we are not in the control of the lifecycle of the Executor, it would be unwise to try and shut it down and even if we did it, other worker instances would stop working as well. The best we can do is to track the tasks submitted to this particular worker and mass-unsubscribe only them.

Now come the complicated parts. First, let's see the non-delayed schedule() method:

    @Override
    public Subscription schedule(Action0 action) {
        if (isUnsubscribed()) {
            return Subscriptions.unsubscribed();
        }
        ScheduledAction sa = 
                new ScheduledAction(action);
        tracking.add(sa);
        sa.add(Subscriptions.create(
                () -> tracking.remove(sa)));        // (1)
           
        queue.offer(sa);                            // (2)
            
        sa.add(Subscriptions.create(
                () -> queue.remove(sa)));           // (3)
            
        if (wip.getAndIncrement() == 0) {           // (4)
            exec.execute(this);                     // (5)
        }
            
        return sa;
    }

The method can't delegate to the delayed overload anymore and needs its own logic:

  1. We create our ScheduledAction (from part 2) and add the logic to remove itself from the tracking structure on unsubscription.
  2. We offer the action to the queue which will keep the FIFO order of the actions submitted sequentially.
  3. Then we add the logic to remove our task from the queue in case it gets unsubscribed. Note that this step is O(n) where n is the tasks waiting in the queue before the instance.
  4. We should allow only a single drainer which can only happen if the wip counter transitions from 0 to 1.
  5. If the current call to schedule won the right to drain the queue, we submit the worker itself so its run() method can poll the queue for available tasks.
Note that even if we got an ExecutorService, we can't associate the Future of submitting this with any particular action, therefore, cancellation has to happen some indirect way.

Now let's continue with the run() method's implementation:


    @Override
    public void run() {
        do {
            if (isUnsubscribed()) {                   // (1)
                queue.clear();
                return;
            }
            ScheduledAction sa = queue.poll();        // (2)
            if (sa != null && !sa.isUnsubscribed()) {
                sa.run();                             // (3)
            }
        } while (wip.decrementAndGet() > 0);          // (4)
    }

A relatively straightforward drain logic:

  1. We check if the worker has been unsubscribed in the meantime, if so, we clear the queue (just in case) and return.
  2. We poll the next task from the queue.
  3. Since a removal or unsubscription could clear a queue while run() is executing, we need to check for null and we need to check if the particular ScheduledAction has been unsubscribed or not. If not, we execute it.
  4. We decrement the wip counter until it reaches zero, where it becomes safe to re-schedule the method on the Executor with more tasks to drain if necessary.
Finally, the most complicated method is the delayed schedule():

    @Override
    public Subscription schedule(
            Action0 action, 
            long delayTime,
            TimeUnit unit) {

        if (delayTime <= 0) {
            return schedule(action);                      // (1)
        }
        if (isUnsubscribed()) {
            return Subscriptions.unsubscribed();          // (2)
        }
        
        ScheduledAction sa = 
                new ScheduledAction(action);
        tracking.add(sa);
        sa.add(Subscriptions.create(
                () -> tracking.remove(sa)));              // (3)
        
        ScheduledExecutorService schedex;
        if (exec instanceof ScheduledExecutorService) {
            schedex = (ScheduledExecutorService) exec;    // (4)
        } else {
            schedex = CustomWorker.genericScheduler;      // (5)
        }
        
        Future<?> f = schedex.schedule(() -> {            // (6)
            
            queue.offer(sa);                              // (7)
            
            sa.add(Subscriptions.create(
                    () -> queue.remove(sa)));
            
            if (wip.getAndIncrement() == 0) {
                exec.execute(this);
            }
            
        }, delayTime, unit);
        
        sa.add(Subscriptions.create(
                () -> f.cancel(false)));                  // (8)
        
        return sa;
    }

It looks very similar to the non-delayed schedule() method, but it has to handle the delay in some fashion:

  1. To avoid the unnecessary overhead, we delegate scheduling of tasks with non-positive delay back to the non-delayed schedule() method.
  2. We return an unsubscribed Subscription if the worker has been unsubscribed.
  3. We wrap the action with our ScheduledAction, add it to the tracking structure and register a removal action as well.
  4. We need a scheduling-capable service for the delayed execution, so we check whether the Executor we've got is capable of doing it itself,
  5. or we need to arrange it ourselves, for example, by using the genericScheduler of our CustomWorker from part 2.
  6. With a scheduling service in hand, we schedule a task that will then enqueue the actual task after the specified delay.
  7. In this helper task, we perform the same steps as in the non-delayed schedule() call, but we don't need to wrap our ScheduledAction again. It offers the ScheduledAction to the queue, creates the remove action and does the wip-increment trick to jump-start the drain if necessary.
  8. Once the delayed task returns a Future, we add a cancellation action to the ScheduledAction so the unsubscription will cancel it.
Naturally, we will try the ExecutorScheduler out:

ExecutorService exec = Executors.newFixedThreadPool(3);
try {
    Scheduler s = new ExecutorScheduler(exec);

    Observable<Integer> source = Observable.just(1)
    .delay(500, TimeUnit.MILLISECONDS, s)
    .doOnNext(v -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println(Thread.currentThread());
    });
    
    TestSubscriber<Integer> ts1 = new TestSubscriber<>();
    TestSubscriber<Integer> ts2 = new TestSubscriber<>();
    TestSubscriber<Integer> ts3 = new TestSubscriber<>();
    
    source.subscribe(ts1);
    source.subscribe(ts2);
    source.subscribe(ts3);
    
    ts1.awaitTerminalEvent();
    ts1.assertNoErrors();
    ts1.assertValue(1);
    
    ts2.awaitTerminalEvent();
    ts2.assertNoErrors();
    ts2.assertValue(1);

    ts3.awaitTerminalEvent();
    ts3.assertNoErrors();
    ts3.assertValue(1);
} finally {
    exec.shutdown();
}

Which prints something like this:

Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]

Conclusion

In this part about Schedulers, I've talked about the need to serialize execution of non-delayed actions scheduled by the same worker in case the actual underlying Executor is multi-threaded and can't guarantee it.

In the next post, I'll talk about the case when some scheduling framework, such as a GUI event loop, which don't offer a Future-like cancellation possibility and I'll show how one can interoperate with such API through a Scheduler.

Nincsenek megjegyzések:

Megjegyzés küldése