Java Specialists' Java Training Europehome of the java specialists' newsletter

The Java Specialists' Newsletter
Issue 1542007-12-04 Category: Tips and Tricks Java version: 5+

GitHub Subscribe Free RSS Feed

ResubmittingScheduledPoolExecutor

by Dr. Heinz M. Kabutz
Abstract:
Timers in Java have suffered from the typical Command Pattern characteristics. Exceptions could stop the timer altogether and even with the new ScheduledPoolExecutor, a task that fails is cancelled. In this newsletter we explore how we could reschedule periodic tasks automatically.

Welcome to the 154th issue of The Java(tm) Specialists' Newsletter, which I started writing at 36000 feet flying in a brand new Airbus 320-200 en route to Frankfurt Sun Tech Days. Lovely plane, nice and smooth ride, probably partly due to the altitude it was flying at. The idea for this newsletter came from one of the participants of my recent talk in London on the Secrets of Concurrency; unfortunately this particular talk was not accessible to the general public.

Attention New Yorkers: On the 11th of December 2007, I have been invited to present a talk at the NYC Java SIG on the Secrets of Concurrency. If you are in the area and available that evening, it would be great to meet you! On the website they say that you need to RSVP. Our meeting place is unfortunately not that large, so I would strongly encourage you to let them know if you would like to come.

NEW: Please see our new "Extreme Java" course, combining concurrency, a little bit of performance and Java 8. Extreme Java - Concurrency & Performance for Java 8.

ResubmittingScheduledPoolExecutor

In the early days of Java, when you needed a timer to periodically do some task, you would create a thread, do the task, sleep for some set time, and repeat ad infinitum. If an Error or RuntimeException occurred during the execution of the task, the thread would typically die, also stopping the scheduled task. It suffered from another problem, in that we needed a new thread for every scheduled task in the system. Most of these threads would sit around idle most of the time.

Enter JDK 1.3. With this version, we had a java.util.Timer class that shared one thread between lots of different periodic tasks. We therefore have less unnecessary threads, but this brings with it other problems: First off, if one of the tasks causes an Error or a RuntimeException, the TimerTask thread dies, thus none of the other tasks progress either. Here is a demonstration program:

import java.util.*;

public class TimerTest {
  public static void main(String[] args) {
    Timer timer = new Timer();
    for(int i=0; i<5; i++) {
      final int i1 = i;
      timer.schedule(new TimerTask() {
        public void run() {
          System.out.println("i = " + i1);
          if (Math.random() < 0.1) {
            throw new RuntimeException();
          }
        }
      }, 1000, 1000);
    }
  }
}
  

Secondly, if a task takes a long time to complete, the other tasks might be held-up longer than desired, since the TimerTask only uses one thread to execute the tasks.

These problems were solved in Java 5 with the ScheduledThreadPoolExecutor class. First off, when one task misbehaves by causing unchecked exceptions, only it gets cancelled and the thread pool continues execution. Secondly, since you can create this ExecutorService with several threads, the likelihood that one task is blocking others is decreased.

According to the definition in Goetz, livelock is a form of liveness failure in which a thread, while not blocked, still cannot make progress because it keeps retrying an operation that will always fail. This form of livelock often comes from overeager errorrecovery code that mistakenly treats an unrecoverable error as a recoverable one.

Therefore, cancelling the task that causes an exception is the only safe and sensible thing to do in general; otherwise we could potentially create a livelock. However, there are cases where we might want to reschedule a task that has failed.

By extending the ScheduledThreadPoolExecutor class and overriding the afterExecute(Runnable,Throwable) method, we are able to immediately discover when a task has failed. Unfortunately the Runnable that we see in the afterExecute() method parameter is not the same as the submitted Runnable. It is a wrapper object, that does not allow us to obtain the original submitted task. However, it is the same object as is returned by the schedule() methods. We also need to know what additional parameters were submitted to the ScheduledThreadPoolExecutor so that we can resubmit it.

As a start, we define a ScheduledExceptionHandler interface that is notified when an exception occurs. The exceptionOccurred() method then needs to return true if the task must be rescheduled; false otherwise.

public interface ScheduledExceptionHandler {
  /**
   * @return true if the task should be rescheduled;
   *         false otherwise
   */
  boolean exceptionOccurred(Throwable e);
}
  

The overridden class now uses that to bind the cancelled task to the submitted task. We us an IdentityHashMap to make sure that we are comparing actual objects, not their calculated hash code. Something which can help us is that the Future.isDone() method returns true if a task has been cancelled.

import java.util.*;
import java.util.concurrent.*;

public class ResubmittingScheduledThreadPoolExecutor
    extends ScheduledThreadPoolExecutor {

  /** Default exception handler, always reschedules */
  private static final ScheduledExceptionHandler NULL_HANDLER =
      new ScheduledExceptionHandler() {
        public boolean exceptionOccurred(Throwable e) {
          return true;
        }
      };
  private final Map<Object, FixedRateParameters> runnables =
      new IdentityHashMap<Object, FixedRateParameters>();
  private final ScheduledExceptionHandler handler;

  /**
   * @param reschedule when an exception causes a task to be
   *                   aborted, reschedule it and notify the
   *                   exception listener
   */
  public ResubmittingScheduledThreadPoolExecutor(int poolSize) {
    this(poolSize, NULL_HANDLER);
  }

  public ResubmittingScheduledThreadPoolExecutor(
      int poolSize, ScheduledExceptionHandler handler) {
    super(poolSize);
    this.handler = handler;
  }

  private class FixedRateParameters {
    private Runnable command;
    private long period;
    private TimeUnit unit;

    /**
     * We do not need initialDelay, since we can set it to period
     */
    public FixedRateParameters(Runnable command, long period,
                               TimeUnit unit) {
      this.command = command;
      this.period = period;
      this.unit = unit;
    }
  }

  public ScheduledFuture<?> scheduleAtFixedRate(
      Runnable command, long initialDelay, long period,
      TimeUnit unit) {
    ScheduledFuture<?> future = super.scheduleAtFixedRate(
        command, initialDelay, period, unit);
    runnables.put(future,
        new FixedRateParameters(command, period, unit));
    return future;
  }

  protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture future = (ScheduledFuture) r;
    // future.isDone() is always false for scheduled tasks,
    // unless there was an exception
    if (future.isDone()) {
      try {
        future.get();
      } catch (ExecutionException e) {
        Throwable problem = e.getCause();
        FixedRateParameters parms = runnables.remove(r);
        if (problem != null && parms != null) {
          boolean resubmitThisTask =
              handler.exceptionOccurred(problem);
          if (resubmitThisTask) {
            scheduleAtFixedRate(parms.command, parms.period,
                parms.period, parms.unit);
          }
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}
  

We can see how this works by looking at an example that decides to retry five times and then gives up. Such a system would be more robust than just giving up after the first failure, and since we put a limit on the retries, we also avoid livelook issues.

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ResubmittingTest {
  public static void main(String[] args)
      throws InterruptedException {
    ScheduledExecutorService service2 =
        new ResubmittingScheduledThreadPoolExecutor(
            5, new MyScheduledExceptionHandler());
    service2.scheduleAtFixedRate(
        new MyRunnable(), 2, 1, TimeUnit.SECONDS);
  }

  private static class MyRunnable implements Runnable {
    public void run() {
      if (Math.random() < 0.3) {
        System.out.println("I have a problem");
        throw new IllegalArgumentException("I have a problem");
      }
      System.out.println("I'm happy");
    }
  }

  /** As an example, we give up after 5 failures. */
  private static class MyScheduledExceptionHandler
      implements ScheduledExceptionHandler {
    private AtomicInteger problems = new AtomicInteger();

    public boolean exceptionOccurred(Throwable e) {
      e.printStackTrace();
      if (problems.incrementAndGet() >= 5) {
        System.err.println("We give up!");
        return false;
      }
      System.err.println("Resubmitting task to scheduler");
      return true;
    }
  }
}
  

That's all for now - hope you enjoyed this newsletter and will find it useful in your work!

Heinz

P.S. Another solution to managing exceptions is to catch them in the tasks themselves, thus not letting them bubble up. In our case we were looking for a general solution to the cancelled task problems.

Tips and Tricks Articles Related Java Course

Java Master
Java Concurrency
Design Patterns
In-House Courses



© 2010-2014 Heinz Kabutz - All Rights Reserved Sitemap
Oracle and Java are registered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners. JavaSpecialists.eu is not connected to Oracle, Inc. and is not sponsored by Oracle, Inc.
@CORE_THE_BAND #RBBJGR