|
The Java Specialists' Newsletter
Issue 154 2007-12-04
Category:
Tips and Tricks
Java version: 5+ ResubmittingScheduledPoolExecutorby Dr. Heinz M. KabutzAbstract:
Timers in Java have suffered from the typical Command
Pattern characteristics. Exceptions could stop the timer
altogether and even with the new ScheduledPoolExecutor,
a task that fails is cancelled. In this newsletter we
explore how we could reschedule periodic tasks
automatically.
Welcome to the 154th issue of The Java(tm) Specialists' Newsletter, which I started
writing at 36000 feet flying in a brand new Airbus 320-200
en route to Frankfurt Sun Tech Days. Lovely plane, nice and
smooth ride, probably partly due to the altitude it was
flying at. The idea for this newsletter came from one of the
participants of my recent talk in London on the Secrets of
Concurrency; unfortunately this particular talk was not
accessible to the general public.
Attention New Yorkers: On the 11th of December 2007, I
have been invited to present a talk at the NYC Java SIG on
the Secrets of Concurrency. If you are in the area and
available that evening, it would be great to meet you! On
the website they say that you need to RSVP. Our meeting
place is unfortunately not that large, so I would strongly
encourage you to let them know if you would like to come.
Upcoming Java Specialist Master Courses:
- please click here to sign up.
As from May 2010, we are also offering this course on the island of Crete. We
only accept 6 students per class in Crete, due to the size of our conference
room. Please book early to avoid disappointment!
San Jose CA, Mar 16-19 2010, $3500 Ottawa, Canada, Mar 22-25 2010, $3500 Oslo, Norway, Apr 13-16 2010, Kr 24500 Montreal, Canada, Apr 20-23 2010, $3500 Toronto, Canada, May 17-20 2010, $3500 Chania, Crete, May 25-28, Jun 29-Jul 2 or Aug 24-27 2010, €2500
In-house courses if these dates or locations do not suit you - click here for more information. ResubmittingScheduledPoolExecutor
In the early days of Java, when you needed a timer to
periodically do some task, you would create a thread, do the
task, sleep for some set time, and repeat ad infinitum. If
an Error or RuntimeException occurred during the execution of
the task, the thread would typically die, also stopping the
scheduled task. It suffered from another problem, in that we
needed a new thread for every scheduled task in the system.
Most of these threads would sit around idle most of the time.
Enter JDK 1.3. With this version, we had a java.util.Timer
class that shared one thread between lots of different
periodic tasks. We therefore have less unnecessary threads,
but this brings with it other problems: First off, if one of
the tasks causes an Error or a RuntimeException, the
TimerTask thread dies, thus none of the other tasks progress
either. Here is a demonstration program:
import java.util.*;
public class TimerTest {
public static void main(String[] args) {
Timer timer = new Timer();
for(int i=0; i<5; i++) {
final int i1 = i;
timer.schedule(new TimerTask() {
public void run() {
System.out.println("i = " + i1);
if (Math.random() < 0.1) {
throw new RuntimeException();
}
}
}, 1000, 1000);
}
}
}
Secondly, if a task takes a long time to complete, the
other tasks might be held-up longer than desired, since the
TimerTask only uses one thread to execute the tasks.
These problems were solved in Java 5 with the
ScheduledThreadPoolExecutor class. First off, when one task
misbehaves by causing unchecked exceptions, only it gets
cancelled and the thread pool continues execution. Secondly,
since you can create this ExecutorService with several
threads, the likelihood that one task is blocking others is
decreased.
According to the definition in
Goetz, livelock is a form of
liveness failure in which a thread, while not blocked, still
cannot make progress because it keeps retrying an operation
that will always fail. This form of livelock often comes
from overeager errorrecovery code that mistakenly treats an
unrecoverable error as a recoverable one.
Therefore, cancelling the task that causes an exception is
the only safe and sensible thing to do in general; otherwise
we could potentially create a livelock. However, there are
cases where we might want to reschedule a task that has
failed.
By extending the ScheduledThreadPoolExecutor
class and overriding the
afterExecute(Runnable,Throwable) method, we are
able to immediately discover when a task has failed.
Unfortunately the Runnable that we see in the
afterExecute() method parameter is not the same
as the submitted Runnable. It is a wrapper object, that does
not allow us to obtain the original submitted task. However,
it is the same object as is returned by the schedule()
methods. We also need to know what additional parameters
were submitted to the
ScheduledThreadPoolExecutor so that we can
resubmit it.
As a start, we define a ScheduledExceptionHandler
interface that is notified when an exception occurs. The
exceptionOccurred() method then needs to return
true if the task must be rescheduled;
false otherwise.
public interface ScheduledExceptionHandler {
/**
* @return true if the task should be rescheduled;
* false otherwise
*/
boolean exceptionOccurred(Throwable e);
}
The overridden class now uses that to bind the cancelled task
to the submitted task. We us an IdentityHashMap to make sure
that we are comparing actual objects, not their calculated
hash code. Something which can help us is that the
Future.isDone() method returns true if a
task has been cancelled.
import java.util.*;
import java.util.concurrent.*;
public class ResubmittingScheduledThreadPoolExecutor
extends ScheduledThreadPoolExecutor {
/** Default exception handler, always reschedules */
private static final ScheduledExceptionHandler NULL_HANDLER =
new ScheduledExceptionHandler() {
public boolean exceptionOccurred(Throwable e) {
return true;
}
};
private final Map<Object, FixedRateParameters> runnables =
new IdentityHashMap<Object, FixedRateParameters>();
private final ScheduledExceptionHandler handler;
/**
* @param reschedule when an exception causes a task to be
* aborted, reschedule it and notify the
* exception listener
*/
public ResubmittingScheduledThreadPoolExecutor(int poolSize) {
this(poolSize, NULL_HANDLER);
}
public ResubmittingScheduledThreadPoolExecutor(
int poolSize, ScheduledExceptionHandler handler) {
super(poolSize);
this.handler = handler;
}
private class FixedRateParameters {
private Runnable command;
private long period;
private TimeUnit unit;
/**
* We do not need initialDelay, since we can set it to period
*/
public FixedRateParameters(Runnable command, long period,
TimeUnit unit) {
this.command = command;
this.period = period;
this.unit = unit;
}
}
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period,
TimeUnit unit) {
ScheduledFuture<?> future = super.scheduleAtFixedRate(
command, initialDelay, period, unit);
runnables.put(future,
new FixedRateParameters(command, period, unit));
return future;
}
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture future = (ScheduledFuture) r;
// future.isDone() is always false for scheduled tasks,
// unless there was an exception
if (future.isDone()) {
try {
future.get();
} catch (ExecutionException e) {
Throwable problem = e.getCause();
FixedRateParameters parms = runnables.remove(r);
if (problem != null && parms != null) {
boolean resubmitThisTask =
handler.exceptionOccurred(problem);
if (resubmitThisTask) {
scheduleAtFixedRate(parms.command, parms.period,
parms.period, parms.unit);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
We can see how this works by looking at an example that
decides to retry five times and then gives up. Such a system
would be more robust than just giving up after the first
failure, and since we put a limit on the retries, we also
avoid livelook issues.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ResubmittingTest {
public static void main(String[] args)
throws InterruptedException {
ScheduledExecutorService service2 =
new ResubmittingScheduledThreadPoolExecutor(
5, new MyScheduledExceptionHandler());
service2.scheduleAtFixedRate(
new MyRunnable(), 2, 1, TimeUnit.SECONDS);
}
private static class MyRunnable implements Runnable {
public void run() {
if (Math.random() < 0.3) {
System.out.println("I have a problem");
throw new IllegalArgumentException("I have a problem");
}
System.out.println("I'm happy");
}
}
/** As an example, we give up after 5 failures. */
private static class MyScheduledExceptionHandler
implements ScheduledExceptionHandler {
private AtomicInteger problems = new AtomicInteger();
public boolean exceptionOccurred(Throwable e) {
e.printStackTrace();
if (problems.incrementAndGet() >= 5) {
System.err.println("We give up!");
return false;
}
System.err.println("Resubmitting task to scheduler");
return true;
}
}
}
That's all for now - hope you enjoyed this newsletter and
will find it useful in your work!
Heinz
P.S. Another solution to managing exceptions is to catch them
in the tasks themselves, thus not letting them bubble up. In
our case we were looking for a general solution to the
cancelled task problems.
Tips and Tricks Articles
Related Java Course
|