Running on Java 22-ea+15-1134 (Preview)
Home of The JavaSpecialists' Newsletter

206Striped Executor Service

Author: Dr. Heinz M. KabutzDate: 2012-11-13Java Version: 7Category: Concurrency
 

Abstract: We present a new type of ExecutorService that allows users to "stripe" their execution in such a way that all tasks belonging to one stripe are executed in-order.

 

Welcome to the 206th issue of The Java(tm) Specialists' Newsletter, which I started writing en route to W-JAX, followed by a short visit to a customer in Amsterdam. Next week I'm off to Vienna for a fun-filled week of Java Design Patterns. Then we are running our new Concurrency Course in Düsseldorf Germany. Lots of flying at the moment. I might miss the olive oil harvest this year unfortunately. We have about 150 trees and we should get quite a few hundred liters of oil from them this year. Sadly the olive oil price has dropped significantly, to where it is only marginally more precious than heating oil.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Striped Executor Service

A few months ago, Glenn McGregor sent an email to the Concurrency Interest mailing list, asking whether any of us knew of a thread pool with the ability to execute jobs in-order by stripe. Each Runnable would implement something like his StripedRunner and then get sorted according to its stripeClass.

interface StripedRunner {
  Object getStripeClass();
  void run();
}

This magical thread pool would ensure that all Runnables with the same stripeClass would be executed in the order they were submitted, but StripedRunners with different stripedClasses could still execute independently. He wanted to use a relatively small thread pool to service a large number of Java NIO clients, but in such a way that the runnables would still be executed in-order.

Several suggestions were made, such as having a SingleThreadExecutor for each stripeClass. However, that would not satisfy the requirement that we could share the threads between connections.

My first attempt to solve this challenge was to create a special StripedBlockingQueue that would release objects in order by stripe, and that also would not release new objects from a particular stripe until the previous runnable had been completed. It almost worked, but unfortunately jobs are not always enqueued before being delivered to the worker threads. If there are workers available, they are handed over immediately. Thus this approach could not guarantee in-order execution by runnables within a stripeClass.

In my second attempt, I instead wrote a StripedExecutorService that can run striped tasks in-order. To keep things simple, I decided to use Java 5 locks and conditions to communicate state between threads. I know that this is not as hip as lock-free, non-blocking algorithms, but I always start with a basic design that works and then later refactor it if the need arises. Usually a lock-free algorithms is several factors more effort to get right and does not always guarantee better performance.

In my design I use the SerialExecutor mentioned in the JavaDocs of Executor, kindly pointed out by Joe Bowbeer.

The SerialExecutor unfortunately misses an important component: shutdown. Ask most parents and they will tell you that conceiving a new life is surprisingly effortless. It is easy starting new things, but not so easy winding them down. For example, it took me about two weeks to start Maximum Solutions (Pty) Ltd in South Africa, a company that I ran for 10 years from 1998 until 2008. When it came time to shut down the company, I discovered that this took many years. It has been lying dormant for four years, but still exists, I think. The shutdownNow() method exists, but it involves unpleasantries such as tax audits that would cost me a lot in accounting fees to manage. When we founded our company in Greece, the accountant asked us for how long we wanted our company to run. You can actually decide up front for how many years you want to carry on. I found this a rather curious question. Don't most businessmen want their company to run for a hundred years?

Before we examine the striped executor, we should study the SerialExecutor and understand how it works. Here is the code, with a minor modification in that I call tasks.add() instead of tasks.offer(). If offer() for some reason did not work, it would cause a silent failure, whereas it would be better to see an exception.

public class SerialExecutor implements Executor {
  private final Queue<Runnable> tasks = new ArrayDeque<>();
  private final Executor executor;
  private Runnable active;

  public SerialExecutor(Executor executor) {
    this.executor = executor;
  }

  public synchronized void execute(final Runnable r) {
    tasks.add(new Runnable() {
      public void run() {
        try {
          r.run();
        } finally {
          scheduleNext();
        }
      }
    });
    if (active == null) {
      scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {
    if ((active = tasks.poll()) != null) {
      executor.execute(active);
    }
  }
}

Here is how it works. Let's say that SubmitterThread passes in Runnables R1, R2, R3 and R4, one after the other. When R1 is passed in, it is wrapped with a new Runnable that will call the R1.run() method and then allow the next Runnable in the tasks queue to be executed. Let's call the new Runnable R1*. Since there is no other Runnable currently active (thus active==null), the SubmitterThread also calls scheduleNext(), which pulls R1* out of the queue and submits it to the wrapped Executor. The SubmitterThread can now call execute() with R2, R3 and R4, which will wrap these Runnables also with our special Runnable and submit them to our tasks queue.

Inside R1*, once R1 is completed, the scheduleNext() method is invoked. This will pull R2* from the tasks list and submit it to the wrapped Executor. This will continue until R4*, which will call scheduleNext(), but that will simply set active to null.

The SerialExecutor implements only the Executor interface, which means we cannot submit Callables, nor can we shut it down cleanly. In this newsletter, we will expand this mechanism to be fronted by an ExecutorService that will be more flexible in its design and also allow us to shut it down cleanly.

Here is a short example of the SerialExecutor at work. We use the Phaser to signal when the jobs have been completed, otherwise we do not know when to return from the test() method. Before we add a new Runnable to the executor, we register an additional party to the phaser. Each time one of the Runnables is done, it sends a message to the phaser with the "arrive()" method. The phaser.arriveAndAwaitAdvance() will do so uninterruptibly.

public class SerialExecutorExample {
  private static final int UPTO = 10;

  public static void main(String[] args) {
    ExecutorService cached = Executors.newCachedThreadPool();
    test(new SerialExecutor(cached));
    test(cached);
    cached.shutdown();
  }

  private static void test(Executor executor) {
    final Vector<Integer> call_sequence = new Vector<>();
    final Phaser phaser = new Phaser(1);
    for(int i=0; i < UPTO; i++) {
      phaser.register();
      final int tempI = i;
      executor.execute(new Runnable() {
        public void run() {
          try {
            TimeUnit.MILLISECONDS.sleep(
                ThreadLocalRandom.current().nextInt(2, 10)
            );
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
          call_sequence.add(tempI);
          phaser.arrive();
        }
      });
    }
    // we need to wait until all the jobs are done
    phaser.arriveAndAwaitAdvance();
    System.out.println(call_sequence);
  }
}

When I run the SerialExecutorExample, I get results such as these:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[2, 4, 1, 7, 0, 8, 5, 9, 6, 3]
StripedExecutor Solution

In our solution, instead of having to maintain one SerialExecutor per stripe of execution, we allow users to submit objects that fall into a certain stripe. We accept Runnable and Callable.

First we have three interfaces that we can implement in order to indicate which stripe our runnable belongs to, starting with the StripedObject. We decide on the stripe by identity, rather than the actual hash code of the object.

public interface StripedObject {
  Object getStripe();
}

In my implementation, you can mix Callable and Runnable objects. As long as they belong to the same stripe, they will be executed in-order.

public interface StripedRunnable extends Runnable, StripedObject{
}

And of course we can also submit Callables to our pool:

public interface StripedCallable<V> extends Callable<V>,
    StripedObject {
}

The StripedExecutorService accepts Runnable/Callable objects, which may also implement the StripedObject interface. If they do, then they will be executed in order for their stripe. If they do not, that is, they are ordinary Runnable/Callable tasks, we immediately pass them on to the wrapped Executor Service.

As mentioned earlier, we use the SerialExecutor to invoke all the tasks for a particular stripe in-order. The stripe association with the SerialExecutor is maintained in an IdentityHashMap. In order to avoid a memory leak, we remove the SerialExecutor as soon as all tasks for that stripe are completed.

/**
 * The StripedExecutorService accepts Runnable/Callable objects
 * that also implement the StripedObject interface.  It executes
 * all the tasks for a single "stripe" consecutively.
 *
 * In this version, submitted tasks do not necessarily have to
 * implement the StripedObject interface.  If they do not, then
 * they will simply be passed onto the wrapped ExecutorService
 * directly.
 *
 * Idea inspired by Glenn McGregor on the Concurrency-interest
 * mailing list and using the SerialExecutor presented in the
 * Executor interface's JavaDocs.
 *
 * https://gee.cs.oswego.edu/dl/concurrency-interest/
 *
 * @author Dr Heinz M. Kabutz
 */
public class StripedExecutorService extends AbstractExecutorService {
  /**
   * The wrapped ExecutorService that will actually execute our
   * tasks.
   */
  private final ExecutorService executor;

  /**
   * The lock prevents shutdown from being called in the middle
   * of a submit.  It also guards the executors IdentityHashMap.
   */
  private final ReentrantLock lock = new ReentrantLock();

  /**
   * This condition allows us to cleanly terminate this executor
   * service.
   */
  private final Condition terminating = lock.newCondition();

  /**
   * Whenever a new StripedObject is submitted to the pool, it
   * is added to this IdentityHashMap.  As soon as the
   * SerialExecutor is empty, the entry is removed from the map,
   * in order to avoid a memory leak.
   */
  private final Map<Object, SerialExecutor> executors =
      new IdentityHashMap<>();

  /**
   * The default submit() method creates a new FutureTask and
   * wraps our StripedRunnable with it.  We thus need to
   * remember the stripe object somewhere.  In our case, we will
   * do this inside the ThreadLocal "stripes".  Before the
   * thread returns from submitting the runnable, it will always
   * remove the thread local entry.
   */
  private static final ThreadLocal<Object> stripes =
      new ThreadLocal<>();

  /**
   * Valid states are RUNNING and SHUTDOWN.  We rely on the
   * underlying executor service for the remaining states.
   */
  private State state = State.RUNNING;

  private static enum State {
    RUNNING, SHUTDOWN
  }

  /**
   * The constructor taking executors is private, since we do
   * not want users to shutdown their executors directly,
   * otherwise jobs might get stuck in our queues.
   *
   * @param executor the executor service that we use to execute
   *                 the tasks
   */
  private StripedExecutorService(ExecutorService executor) {
    this.executor = executor;
  }

  /**
   * This constructs a StripedExecutorService that wraps a
   * cached thread pool.
   */
  public StripedExecutorService() {
    this(Executors.newCachedThreadPool());
  }

  /**
   * This constructs a StripedExecutorService that wraps a fixed
   * thread pool with the given number of threads.
   */
  public StripedExecutorService(int numberOfThreads) {
    this(Executors.newFixedThreadPool(numberOfThreads));
  }

  /**
   * If the runnable also implements StripedObject, we store the
   * stripe object in a thread local, since the actual runnable
   * will be wrapped with a FutureTask.
   */
  protected <T> RunnableFuture<T> newTaskFor(
      Runnable runnable, T value) {
    saveStripedObject(runnable);
    return super.newTaskFor(runnable, value);
  }

  /**
   * If the callable also implements StripedObject, we store the
   * stripe object in a thread local, since the actual callable
   * will be wrapped with a FutureTask.
   */
  protected <T> RunnableFuture<T> newTaskFor(
      Callable<T> callable) {
    saveStripedObject(callable);
    return super.newTaskFor(callable);
  }

  /**
   * Saves the stripe in a ThreadLocal until we can use it to
   * schedule the task into our pool.
   */
  private void saveStripedObject(Object task) {
    if (isStripedObject(task)) {
      stripes.set(((StripedObject) task).getStripe());
    }
  }

  /**
   * Returns true if the object implements the StripedObject
   * interface.
   */
  private static boolean isStripedObject(Object o) {
    return o instanceof StripedObject;
  }

  /**
   * Delegates the call to submit(task, null).
   */
  public Future<?> submit(Runnable task) {
    return submit(task, null);
  }

  /**
   * If the task is a StripedObject, we execute it in-order by
   * its stripe, otherwise we submit it directly to the wrapped
   * executor.  If the pool is not running, we throw a
   * RejectedExecutionException.
   */
  public <T> Future<T> submit(Runnable task, T result) {
    lock.lock();
    try {
      checkPoolIsRunning();
      if (isStripedObject(task)) {
        return super.submit(task, result);
      } else { // bypass the serial executors
        return executor.submit(task, result);
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * If the task is a StripedObject, we execute it in-order by
   * its stripe, otherwise we submit it directly to the wrapped
   * executor.  If the pool is not running, we throw a
   * RejectedExecutionException.
   */
  public <T> Future<T> submit(Callable<T> task) {
    lock.lock();
    try {
      checkPoolIsRunning();
      if (isStripedObject(task)) {
        return super.submit(task);
      } else { // bypass the serial executors
        return executor.submit(task);
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * Throws a RejectedExecutionException if the state is not
   * RUNNING.
   */
  private void checkPoolIsRunning() {
    assert lock.isHeldByCurrentThread();
    if (state != State.RUNNING) {
      throw new RejectedExecutionException(
          "executor not running");
    }
  }

  /**
   * Executes the command.  If command implements StripedObject,
   * we execute it with a SerialExecutor.  This method can be
   * called directly by clients or it may be called by the
   * AbstractExecutorService's submit() methods. In that case,
   * we check whether the stripes thread local has been set.  If
   * it is, we remove it and use it to determine the
   * StripedObject and execute it with a SerialExecutor.  If no
   * StripedObject is set, we instead pass the command to the
   * wrapped ExecutorService directly.
   */
  public void execute(Runnable command) {
    lock.lock();
    try {
      checkPoolIsRunning();
      Object stripe = getStripe(command);
      if (stripe != null) {
        SerialExecutor ser_exec = executors.get(stripe);
        if (ser_exec == null) {
          executors.put(stripe, ser_exec =
              new SerialExecutor(stripe));
        }
        ser_exec.execute(command);
      } else {
        executor.execute(command);
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * We get the stripe object either from the Runnable if it
   * also implements StripedObject, or otherwise from the thread
   * local temporary storage.  Result may be null.
   */
  private Object getStripe(Runnable command) {
    Object stripe;
    if (command instanceof StripedObject) {
      stripe = (((StripedObject) command).getStripe());
    } else {
      stripe = stripes.get();
    }
    stripes.remove();
    return stripe;
  }

  /**
   * Shuts down the StripedExecutorService.  No more tasks will
   * be submitted.  If the map of SerialExecutors is empty, we
   * shut down the wrapped executor.
   */
  public void shutdown() {
    lock.lock();
    try {
      state = State.SHUTDOWN;
      if (executors.isEmpty()) {
        executor.shutdown();
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * All the tasks in each of the SerialExecutors are drained
   * to a list, as well as the tasks inside the wrapped
   * ExecutorService.  This is then returned to the user.  Also,
   * the shutdownNow method of the wrapped executor is called.
   */
  public List<Runnable> shutdownNow() {
    lock.lock();
    try {
      shutdown();
      List<Runnable> result = new ArrayList<>();
      for (SerialExecutor ser_ex : executors.values()) {
        ser_ex.tasks.drainTo(result);
      }
      result.addAll(executor.shutdownNow());
      return result;
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns true if shutdown() or shutdownNow() have been
   * called; false otherwise.
   */
  public boolean isShutdown() {
    lock.lock();
    try {
      return state == State.SHUTDOWN;
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns true if this pool has been terminated, that is, all
   * the SerialExecutors are empty and the wrapped
   * ExecutorService has been terminated.
   */
  public boolean isTerminated() {
    lock.lock();
    try {
      if (state == State.RUNNING) return false;
      for (SerialExecutor executor : executors.values()) {
        if (!executor.isEmpty()) return false;
      }
      return executor.isTerminated();
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns true if the wrapped ExecutorService terminates
   * within the allotted amount of time.
   */
  public boolean awaitTermination(long timeout, TimeUnit unit)
      throws InterruptedException {
    lock.lock();
    try {
      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
      long remainingTime;
      while ((remainingTime = waitUntil - System.nanoTime()) > 0
          && !executors.isEmpty()) {
        terminating.awaitNanos(remainingTime);
      }
      if (remainingTime <= 0) return false;
      if (executors.isEmpty()) {
        return executor.awaitTermination(
            remainingTime, TimeUnit.NANOSECONDS);
      }
      return false;
    } finally {
      lock.unlock();
    }
  }

  /**
   * As soon as a SerialExecutor is empty, we remove it from the
   * executors map.  We might thus remove the SerialExecutors
   * more quickly than necessary, but at least we can avoid a
   * memory leak.
   */
  private void removeEmptySerialExecutor(Object stripe,
                                         SerialExecutor ser_ex) {
    assert ser_ex == executors.get(stripe);
    assert lock.isHeldByCurrentThread();
    assert ser_ex.isEmpty();

    executors.remove(stripe);
    terminating.signalAll();
    if (state == State.SHUTDOWN && executors.isEmpty()) {
      executor.shutdown();
    }
  }

  /**
   * This field is used for conditional compilation.  If it is
   * false, then the finalize method is an empty method, in
   * which case the SerialExecutor will not be registered with
   * the Finalizer.
   */
  private static boolean DEBUG = false;

  /**
   * SerialExecutor is based on the construct with the same name
   * described in the {@link Executor} JavaDocs.  The difference
   * with our SerialExecutor is that it can be terminated.  It
   * also removes itself automatically once the queue is empty.
   */
  private class SerialExecutor implements Executor {
    /**
     * The queue of unexecuted tasks.
     */
    private final BlockingQueue<Runnable> tasks =
        new LinkedBlockingQueue<>();
    /**
     * The runnable that we are currently busy with.
     */
    private Runnable active;
    /**
     * The stripe that this SerialExecutor was defined for.  It
     * is needed so that we can remove this executor from the
     * map once it is empty.
     */
    private final Object stripe;

    /**
     * Creates a SerialExecutor for a particular stripe.
     */
    private SerialExecutor(Object stripe) {
      this.stripe = stripe;
      if (DEBUG) {
        System.out.println("SerialExecutor created " + stripe);
      }
    }

    /**
     * We use finalize() only for debugging purposes.  If
     * DEBUG==false, the body of the method will be compiled
     * away, thus rendering it a trivial finalize() method,
     * which means that the object will not incur any overhead
     * since it won't be registered with the Finalizer.
     */
    protected void finalize() throws Throwable {
      if (DEBUG) {
        System.out.println("SerialExecutor finalized " + stripe);
        super.finalize();
      }
    }

    /**
     * For every task that is executed, we add() a wrapper to
     * the queue of tasks that will run the current task and
     * then schedule the next task in the queue.
     */
    public void execute(final Runnable r) {
      lock.lock();
      try {
        tasks.add(new Runnable() {
          public void run() {
            try {
              r.run();
            } finally {
              scheduleNext();
            }
          }
        });
        if (active == null) {
          scheduleNext();
        }
      } finally {
        lock.unlock();
      }
    }

    /**
     * Schedules the next task for this stripe.  Should only be
     * called if active == null or if we are finished executing
     * the currently active task.
     */
    private void scheduleNext() {
      lock.lock();
      try {
        if ((active = tasks.poll()) != null) {
          executor.execute(active);
          terminating.signalAll();
        } else {
          removeEmptySerialExecutor(stripe, this);
        }
      } finally {
        lock.unlock();
      }
    }

    /**
     * Returns true if the list is empty and there is no task
     * currently executing.
     */
    public boolean isEmpty() {
      lock.lock();
      try {
        return active == null && tasks.isEmpty();
      } finally {
        lock.unlock();
      }
    }
  }
}

A full test suite is available in my git repository in StripedExecutorServiceTest. However, here is a small example that shows how the StripedExecutorService works:

public class StripedExecutorServiceExample {
  private static final int UPTO = 10;

  public static void main(String[] args)
      throws InterruptedException {
    test(new StripedExecutorService());
    test(Executors.newCachedThreadPool());
  }

  private static void test(ExecutorService pool)
      throws InterruptedException {
    final Vector<Integer> call_sequence = new Vector<>();
    for (int i = 0; i < UPTO; i++) {
      final int tempI = i;
      pool.submit(new StripedCallable<Void>() {
        public Void call() throws Exception {
          TimeUnit.MILLISECONDS.sleep(
              ThreadLocalRandom.current().nextInt(2, 10)
          );
          call_sequence.add(tempI);
          return null;
        }

        public Object getStripe() {
          return call_sequence;
        }
      });
    }
    pool.shutdown();
    while (!pool.awaitTermination(1, TimeUnit.SECONDS)) ;
    System.out.println(call_sequence);
  }
}

We could now have several "stripes" of execution inside one ExecutorService. In addition, we are able to shut it down cleanly.

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...