Running on Java 22-ea+15-1134 (Preview)
Home of The JavaSpecialists' Newsletter

214CountingCompletionService

Author: Dr. Heinz M. KabutzDate: 2013-10-10Java Version: 8Category: Concurrency
 

Abstract: CompletionService queues finished tasks, making it easier to retrieve Futures in order of completion. But it lacks some basic functionality, such as a count of how many tasks have been submitted.

 

Welcome to the 214th issue of The Java(tm) Specialists' Newsletter, sent from sunny Crete. As a proud owner of a polyteknos card (meaning I have produced four new Greek taxpayers), I now get discounts almost everywhere I go in Greece. All forms of public transport, ferries, shops. Spar gives me 3% off everything, including beer. That's fair, I think. Beer is important.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

CountingCompletionService

ExecutorService provides a convenient way to submit tasks that can then be executed asynchronously by threads provided by the service. Instead of paying the price of thread start latency, the job can immediately be started by an available thread. The decrease in thread start latency however only gives us a performance advantage if a thread is currently idle waiting for work. If that is not the case, then either a thread will have to be started, as happens with the cached thread pool, or the thread will have to be put in a waiting queue, as you would find in the fixed thread pool. In either case, our latency can be greater than if we had just started a thread directly. As Kirk says: "When last did you see a queue and say to yerself - yay, that will go fast!"

Let's think about queuing a bit, based on our experience in real life. Most of us have been to airports. Imagine you are walking towards the check-in queue. At the same time, you see a large family walking towards the queue, but from the other side. What do you do? You subtly up your pace a bit, hoping to get there first. Once you are in the queue, it does not really matter what is happening around you as you have to wait anyway. Until you get near the front, that is. For some reason, the person directly in front of me always seems fast asleep. The agent at the open counter is articulating wildly with her arms, but our hero is obliviously studying his fingernails. Even though the single queue with multiple server system theoretically gives us the shortest average wait time, we get contention at the head and tail of the single queue. The same contention happens with the traditional thread pool design and is one of the reasons why the ForkJoinPool employs several queues and work stealing. Tasks may sometimes be executed out-of-order, but because tasks are supposed to be independent, this should not cause issues.

Usually when you submit a Callable<V> to an ExecutorService, you need to manage the Future<V> that is returned by the submit() method. However, since each task may take a different time to complete, you could quite easily block on a future whilst another future might already be available. Here is an example (using Java 8 lambdas - see Maurice Naftalin's Lambda FAQ):

import java.util.*;
import java.util.concurrent.*;

public class ExecutorServiceExample {
  public static void main(String... args) throws Exception {
    try (DotPrinter dp = new DotPrinter()) {
      ExecutorService pool = Executors.newCachedThreadPool();
      Collection<Future<Integer>> futures = new ArrayList<>();
      for (int i = 0; i < 10; i++) {
        int sleeptime = 5 - i % 5;
        int order = i;
        futures.add(pool.submit(() -> {
          TimeUnit.SECONDS.sleep(sleeptime);
          return order;
        }));
      }

      for (Future<Integer> future : futures) {
        System.out.printf("Job %d is done%n", future.get());
      }
      pool.shutdown();
    }
  }
}

Here is the code for our DotPrinter, whose entire purpose in life is to keep our attention occupied by printing a meaningless dot once every second:

import java.util.concurrent.*;

public class DotPrinter implements AutoCloseable {
  private final ScheduledExecutorService timer =
      Executors.newSingleThreadScheduledExecutor();

  public DotPrinter() {
    timer.scheduleAtFixedRate(() -> {
      System.out.print(".");
      System.out.flush();
    }, 1, 1, TimeUnit.SECONDS);
  }

  public void close() {
    timer.shutdown();
  }
}

Output would be the following:

.....Job 0 is done
Job 1 is done
Job 2 is done
Job 3 is done
Job 4 is done
Job 5 is done
Job 6 is done
Job 7 is done
Job 8 is done
Job 9 is done

As we see in the output, as we iterate through the futures in the order we submitted them, we get the results in order of submission. However, because the first job takes the longest, we have to wait until that is done before seeing the results of jobs that have been available for some time. In order to solve that, Java also has an ExecutorCompletionService. It is a very simple class that just contains a BlockingQueue of completed tasks and specialized Futures that enqueue themselves when the done() method is called.

The ExecutorCompletionService lacks some basic functionality, such as a way to find out how many tasks have been submitted. In our next example, we show how we could use the CompletionService to improve the ExecutorServiceExample above.

import java.util.concurrent.*;

public class CompletionServiceExample {
  public static void main(String... args) throws Exception {
    try (DotPrinter dp = new DotPrinter()) {
      ExecutorService pool = Executors.newCachedThreadPool();
      CompletionService<Integer> service =
          new ExecutorCompletionService<>(pool);
      for (int i = 0; i < 10; i++) {
        int sleeptime = 5 - i % 5;
        int order = i;
        service.submit(() -> { // time to get used to lambdas?
          TimeUnit.SECONDS.sleep(sleeptime);
          return order;
        });
      }

      for (int i = 0; i < 10; i++) {
        System.out.printf("Job %d is done%n", service.take().get());
      }
      pool.shutdown();
    }
  }
}

The output now comes to:

.Job 4 is done
Job 9 is done
.Job 3 is done
Job 8 is done
.Job 2 is done
Job 7 is done
.Job 1 is done
Job 6 is done
.Job 0 is done
Job 5 is done

Nice. However, we do need to remember how many tasks we added to the CompletionService. Instead, we could extend the ExecutorCompletionService and add that functionality:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V>
    extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}

We can thus replace the result iterating loop like this:

for (int i = 0; i < service.getNumberOfSubmittedTasks(); i++) {
  System.out.printf("Job %d is done%n", service.take().get());
}

Of course, an even nicer solution would be to build an iterable CompletionService. The loop above could then be replaced with simply:

for (Future<Integer> future : service) {
  System.out.printf("Job %d is done%n", future.get());
}

One of the exercises in my Concurrency Specialist Course is writing such an Iterable. It looks really simple, but there are a few gotchas that you need to be aware of. If you'd like to test your skill, here is an outline of what you need to do:

import java.util.*;

public class CompletionServiceIterable<V>
    implements Iterable<Future<V>> {
  public CompletionServiceIterable() { // TODO
  }

  public void submit(Callable<V> task) { // TODO
  }

  public Iterator<Future<V>> iterator() { // TODO
  }

  public void shutdown() { // TODO
  }

  public boolean isTerminated() { // TODO
  }
}

If you send me your solution, I'll run it against my unit tests and then give you my personal feedback. You should solve it without using any external libraries, just the JDK (6, 7 or 8) and your own code. Good luck :-)

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...