Java Specialists' Java Training Europehome of the java specialists' newsletter

The Java Specialists' Newsletter
Issue 2142013-10-10 Category: Concurrency Java version: Java 8

GitHub Subscribe Free RSS Feed

CountingCompletionService

by Dr. Heinz M. Kabutz
Abstract:
CompletionService queues finished tasks, making it easier to retrieve Futures in order of completion. But it lacks some basic functionality, such as a count of how many tasks have been submitted.

Welcome to the 214th issue of The Java(tm) Specialists' Newsletter, sent from sunny Crete. As a proud owner of a polyteknos card (meaning I have produced four new Greek taxpayers), I now get discounts almost everywhere I go in Greece. All forms of public transport, ferries, shops. Spar gives me 3% off everything, including beer. That's fair, I think. Beer is important.

The dates for our fourth Java Specialists Symposium are from the 25-29 August 2014. In 2013 we almost reached our maximum capacity. Thus the secret for getting a place for next year is to put your name down early. If you think you'd like to attend, please send me a short motivation explaining why you should be invited. I will pass on your details to the "disorganizers" who will make the final decision.

I assembled a list of airlines that flew directly to Chania Airport (CHQ) in August 2013. Once you have seen what is on offer, you can find further information by Googling for "flights to CHQ". I hope this will make it easier for you to visit our island, either for the unconference or just for fun :-)

CountingCompletionService

ExecutorService provides a convenient way to submit tasks that can then be executed asynchronously by threads provided by the service. Instead of paying the price of thread start latency, the job can immediately be started by an available thread. The decrease in thread start latency however only gives us a performance advantage if a thread is currently idle waiting for work. If that is not the case, then either a thread will have to be started, as happens with the cached thread pool, or the thread will have to be put in a waiting queue, as you would find in the fixed thread pool. In either case, our latency can be greater than if we had just started a thread directly. As Kirk says: "When last did you see a queue and say to yerself - yay, that will go fast!"

Let's think about queuing a bit, based on our experience in real life. Most of us have been to airports. Imagine you are walking towards the check-in queue. At the same time, you see a large family walking towards the queue, but from the other side. What do you do? You subtly up your pace a bit, hoping to get there first. Once you are in the queue, it does not really matter what is happening around you as you have to wait anyway. Until you get near the front, that is. For some reason, the person directly in front of me always seems fast asleep. The agent at the open counter is articulating wildly with her arms, but our hero is obliviously studying his fingernails. Even though the single queue with multiple server system theoretically gives us the shortest average wait time, we get contention at the head and tail of the single queue. The same contention happens with the traditional thread pool design and is one of the reasons why the ForkJoinPool employs several queues and work stealing. Tasks may sometimes be executed out-of-order, but because tasks are supposed to be independent, this should not cause issues.

Usually when you submit a Callable<V> to an ExecutorService, you need to manage the Future<V> that is returned by the submit() method. However, since each task may take a different time to complete, you could quite easily block on a future whilst another future might already be available. Here is an example (using Java 8 lambdas - see Maurice Naftalin's Lambda FAQ):

import java.util.*;
import java.util.concurrent.*;

public class ExecutorServiceExample {
  public static void main(String... args) throws Exception {
    try (DotPrinter dp = new DotPrinter()) {
      ExecutorService pool = Executors.newCachedThreadPool();
      Collection<Future<Integer>> futures = new ArrayList<>();
      for (int i = 0; i < 10; i++) {
        int sleeptime = 5 - i % 5;
        int order = i;
        futures.add(pool.submit(() -> {
          TimeUnit.SECONDS.sleep(sleeptime);
          return order;
        }));
      }

      for (Future<Integer> future : futures) {
        System.out.printf("Job %d is done%n", future.get());
      }
      pool.shutdown();
    }
  }
}
  

Here is the code for our DotPrinter, whose entire purpose in life is to keep our attention occupied by printing a meaningless dot once every second:

import java.util.concurrent.*;

public class DotPrinter implements AutoCloseable {
  private final ScheduledExecutorService timer =
      Executors.newSingleThreadScheduledExecutor();

  public DotPrinter() {
    timer.scheduleAtFixedRate(() -> {
      System.out.print(".");
      System.out.flush();
    }, 1, 1, TimeUnit.SECONDS);
  }

  public void close() {
    timer.shutdown();
  }
}
  

Output would be the following:

.....Job 0 is done
Job 1 is done
Job 2 is done
Job 3 is done
Job 4 is done
Job 5 is done
Job 6 is done
Job 7 is done
Job 8 is done
Job 9 is done
  

As we see in the output, as we iterate through the futures in the order we submitted them, we get the results in order of submission. However, because the first job takes the longest, we have to wait until that is done before seeing the results of jobs that have been available for some time. In order to solve that, Java also has an ExecutorCompletionService. It is a very simple class that just contains a BlockingQueue of completed tasks and specialized Futures that enqueue themselves when the done() method is called.

The ExecutorCompletionService lacks some basic functionality, such as a way to find out how many tasks have been submitted. In our next example, we show how we could use the CompletionService to improve the ExecutorServiceExample above.

import java.util.concurrent.*;

public class CompletionServiceExample {
  public static void main(String... args) throws Exception {
    try (DotPrinter dp = new DotPrinter()) {
      ExecutorService pool = Executors.newCachedThreadPool();
      CompletionService<Integer> service =
          new ExecutorCompletionService<>(pool);
      for (int i = 0; i < 10; i++) {
        int sleeptime = 5 - i % 5;
        int order = i;
        service.submit(() -> { // time to get used to lambdas?
          TimeUnit.SECONDS.sleep(sleeptime);
          return order;
        });
      }

      for (int i = 0; i < 10; i++) {
        System.out.printf("Job %d is done%n", service.take().get());
      }
      pool.shutdown();
    }
  }
}
  

The output now comes to:

.Job 4 is done
Job 9 is done
.Job 3 is done
Job 8 is done
.Job 2 is done
Job 7 is done
.Job 1 is done
Job 6 is done
.Job 0 is done
Job 5 is done
  

Nice. However, we do need to remember how many tasks we added to the CompletionService. Instead, we could extend the ExecutorCompletionService and add that functionality:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V>
    extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}
  

We can thus replace the result iterating loop like this:

for (int i = 0; i < service.getNumberOfSubmittedTasks(); i++) {
  System.out.printf("Job %d is done%n", service.take().get());
}
  

Of course, an even nicer solution would be to build an iterable CompletionService. The loop above could then be replaced with simply:

for (Future<Integer> future : service) {
  System.out.printf("Job %d is done%n", future.get());
}
  

One of the exercises in my Concurrency Specialist Course is writing such an Iterable. It looks really simple, but there are a few gotchas that you need to be aware of. If you'd like to test your skill, here is an outline of what you need to do:

import java.util.*;

public class CompletionServiceIterable<V>
    implements Iterable<Future<V>> {
  public CompletionServiceIterable() { // TODO
  }

  public void submit(Callable<V> task) { // TODO
  }

  public Iterator<Future<V>> iterator() { // TODO
  }

  public void shutdown() { // TODO
  }

  public boolean isTerminated() { // TODO
  }
}
  

If you send me your solution, I'll run it against my unit tests and then give you my personal feedback. You should solve it without using any external libraries, just the JDK (6, 7 or 8) and your own code. Good luck :-)

Kind regards

Heinz

Concurrency Articles Related Java Course

Java Master
Java Concurrency
Design Patterns
In-House Courses



© 2010-2014 Heinz Kabutz - All Rights Reserved Sitemap
Oracle and Java are registered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners. JavaSpecialists.eu is not connected to Oracle, Inc. and is not sponsored by Oracle, Inc.
@CORE_THE_BAND #RBBJGR