|
The Java Specialists' Newsletter
Issue 016 2001-04-11
Category:
Performance
Java version: Blocking Queueby Dr. Heinz M. Kabutz
Welcome to the 16th issue of "The Java(tm) Specialists'
Newsletter", written in a dreary-weathered-Germany. Since I'm
a summer person, I really like living in South Africa where we
have 9 months of summer and 3 months of sort-of-winter. It's
quite difficult to explain to my 2-year old son the concepts of
snow, snow-man, snow-ball, etc. Probably as difficult as
explaining to a German child the concepts of cloudless-sky,
beach, BSE-free meat, etc.
Next week I will again not be able to post the newsletter due to
international travelling (Mauritius), but the week after that I
will demonstrate how it is possible to write type-safe enum
types in Java using inner classes and how it is possible to
"switch" on their object references. Switch statements should
never be used, but it is nevertheless fascinating to watch how
the Java language constructs can be abused...
Thanks for reading this newsletter on our website. We also have a mailing list. That is where the real action takes place (webinars, free reports, etc.). Maybe subscribe today?
Blocking Queues for inter-thread communication
This week I want to speak about a very useful construct that we
use for inter-thread communication, called a blocking queue.
Quite often in threaded applications we have a producer-consumer
situation where some threads want to pop jobs onto a queue, and
some other worker threads want to remove jobs from the queue and
then execute them. It is quite useful in such circumstances to
write a queue which blocks on pop when there is nothing on the
queue. Otherwise the consumers would have to poll, and polling
is not very good because it wastes CPU cycles.
I have written a very simple version of the BlockingQueue, a more
advanced version would include alarms that are generated when the
queue reaches a certain length.
---
Warning Advanced:
When I write pieces of code which are synchronized, I usually
avoid synchronizing on "this" or marking the whole method as
synchronized. When you synchronize on "this" inside the class,
it might happen that other code outside of your control also
synchronize on the handle to your object, or worse, call notify
on your handle. This would severely mess up your well-written
BlockingQueue code. I therefore as a habit always use private
data members as locks inside a class, in this case I use the
private queue data member.
Another disadvantage of indiscriminately synchronizing on "this"
is that it is very easy to then lock out parts of your class
which do not necessarily have to be locked out from each other.
For example, I might have a list of listeners in my BlockingQueue
which are notified when the list gets too long. Adding and
removing such listeners from the BlockingQueue should be
synchronized, but you do not have to synchronize in respect of
the push and pop operations, otherwise you limit concurrency.
---
//: BlockingQueue.java
import java.util.*;
public class BlockingQueue {
/**
It makes logical sense to use a linked list for a FIFO queue,
although an ArrayList is usually more efficient for a short
queue (on most VMs).
*/
private final LinkedList queue = new LinkedList();
/**
This method pushes an object onto the end of the queue, and
then notifies one of the waiting threads.
*/
public void push(Object o) {
synchronized(queue) {
queue.add(o);
queue.notify();
}
}
/**
The pop operation blocks until either an object is returned
or the thread is interrupted, in which case it throws an
InterruptedException.
*/
public Object pop() throws InterruptedException {
synchronized(queue) {
while (queue.isEmpty()) {
queue.wait();
}
return queue.removeFirst();
}
}
/** Return the number of elements currently in the queue. */
public int size() {
return queue.size();
}
}
Now we've got a nice little test case that uses the blocking
queue for 10 worker threads which will each pull as many tasks as
possible from the queue. To end the test, we put one poison pill
onto the queue for each of the worker threads, which, when
executed, interrupts the current thread (evil laughter).
//: BlockingQueueTest.java
public class BlockingQueueTest {
private final BlockingQueue bq = new BlockingQueue();
/**
The Worker thread is not very robust. If a RuntimeException
occurse in the run method, the thread will stop.
*/
private class Worker extends Thread {
public Worker(String name) { super(name); start(); }
public void run() {
try {
while(!isInterrupted()) {
((Runnable)bq.pop()).run();
}
} catch(InterruptedException ex) {}
System.out.println(getName() + " finished");
}
}
public BlockingQueueTest() {
// We create 10 threads as workers
Thread[] workers = new Thread[10];
for (int i=0; i<workers.length; i++)
workers[i] = new Worker("Worker Thread " + i);
// We then push 100 commands onto the queue
for (int i=0; i<100; i++) {
final String msg = "Task " + i + " completed";
bq.push(new Runnable() {
public void run() {
System.out.println(msg);
// Sleep a random amount of time, up to 1 second
try { Thread.sleep((long)(Math.random()*1000)); }
catch(InterruptedException ex) { }
}
});
}
// We then push one "poison pill" onto the queue for each
// worker thread, which will only be processed once the other
// tasks are completed.
for (int i=0; i<workers.length; i++) {
bq.push(new Runnable() {
public void run() {
Thread.currentThread().interrupt();
}
});
}
// Lastly we join ourself to each of the Worker threads, so
// that we only continue once all the worker threads are
// finished.
for (int i=0; i<workers.length; i++) {
try {
workers[i].join();
} catch(InterruptedException ex) {}
}
System.out.println("BlockingQueueTest finished");
}
public static void main(String[] args) throws Exception{
new BlockingQueueTest();
}
}
The concepts in the newsletter can be expanded quite a bit. They
could, for example, be used as a basis for implementing a
ThreadPool, or otherwise you can implement an "ActiveQueue" which
performs callbacks to listeners each time an event is pushed onto
the queue via a Thread running inside the ActiveQueue.
It is also possible to use PipedInputStream and PipedOutputStream
to send messages between threads, but then you have to set up a
whole protocol, and if you want to exchange objects you have to
use ObjectOutputStream which will be alot slower than just
passing handles.
Until next week, and please remember to forward this newsletter
in its entirety to as many Java users as you know.
Heinz
Performance Articles
Related Java Course
Discuss at The Java Specialist Club
|