Running on Java 22-ea+15-1134 (Preview)
Home of The JavaSpecialists' Newsletter

016Blocking Queues for Inter-Thread Communication

Author: Dr. Heinz M. KabutzDate: 2001-04-11Java Version: 1.2Category: Performance
 

Abstract: In the days before BlockingQueue was added to Java, we had to write our own. This newsletter describes an approach using synchronized and wait()/notify().

 

Welcome to the 16th issue of The Java(tm) Specialists' Newsletter, written in a dreary-weathered-Germany. Since I'm a summer person, I really like living in South Africa where we have 9 months of summer and 3 months of sort-of-winter. It's quite difficult to explain to my 2-year old son the concepts of snow, snow-man, snow-ball, etc. Probably as difficult as explaining to a German child the concepts of cloudless-sky, beach, BSE-free meat, etc.

Next week I will again not be able to post the newsletter due to international travelling (Mauritius), but the week after that I will demonstrate how it is possible to write type-safe enum types in Java using inner classes and how it is possible to "switch" on their object references. Switch statements should never be used, but it is nevertheless fascinating to watch how the Java language constructs can be abused...

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Blocking Queues for Inter-Thread Communication

This week I want to speak about a very useful construct that we use for inter-thread communication, called a blocking queue. Quite often in threaded applications we have a producer-consumer situation where some threads want to pop jobs onto a queue, and some other worker threads want to remove jobs from the queue and then execute them. It is quite useful in such circumstances to write a queue which blocks on pop when there is nothing on the queue. Otherwise the consumers would have to poll, and polling is not very good because it wastes CPU cycles.

I have written a very simple version of the BlockingQueue, a more advanced version would include alarms that are generated when the queue reaches a certain length.

---
Warning Advanced:
When I write pieces of code which are synchronized, I usually avoid synchronizing on "this" or marking the whole method as synchronized. When you synchronize on "this" inside the class, it might happen that other code outside of your control also synchronize on the handle to your object, or worse, call notify on your handle. This would severely mess up your well-written BlockingQueue code. I therefore as a habit always use private data members as locks inside a class, in this case I use the private queue data member.

Another disadvantage of indiscriminately synchronizing on "this" is that it is very easy to then lock out parts of your class which do not necessarily have to be locked out from each other. For example, I might have a list of listeners in my BlockingQueue which are notified when the list gets too long. Adding and removing such listeners from the BlockingQueue should be synchronized, but you do not have to synchronize in respect of the push and pop operations, otherwise you limit concurrency.
---

//: BlockingQueue.java
import java.util.*;
public class BlockingQueue {
  /**
    It makes logical sense to use a linked list for a FIFO queue,
    although an ArrayList is usually more efficient for a short
    queue (on most VMs).
   */
  private final LinkedList queue = new LinkedList();
  /**
    This method pushes an object onto the end of the queue, and
    then notifies one of the waiting threads.
   */
  public void push(Object o) {
    synchronized(queue) {
      queue.add(o);
      queue.notify();
    }
  }
  /**
    The pop operation blocks until either an object is returned
    or the thread is interrupted, in which case it throws an
    InterruptedException.
   */
  public Object pop() throws InterruptedException {
    synchronized(queue) {
      while (queue.isEmpty()) {
        queue.wait();
      }
      return queue.removeFirst();
    }
  }
  /** Return the number of elements currently in the queue. */
  public int size() {
    synchronized(queue) {
      return queue.size();
    }
  }
}

Now we've got a nice little test case that uses the blocking queue for 10 worker threads which will each pull as many tasks as possible from the queue. To end the test, we put one poison pill onto the queue for each of the worker threads, which, when executed, interrupts the current thread (evil laughter).

//: BlockingQueueTest.java
public class BlockingQueueTest {
  private final BlockingQueue bq = new BlockingQueue();
  /**
    The Worker thread is not very robust.  If a RuntimeException
    occurse in the run method, the thread will stop.
   */
  private class Worker extends Thread {
    public Worker(String name) { super(name); start(); }
    public void run() {
      try {
        while(!isInterrupted()) {
          ((Runnable)bq.pop()).run();
        }
      } catch(InterruptedException ex) {}
      System.out.println(getName() + " finished");
    }
  }
  public BlockingQueueTest() {
    // We create 10 threads as workers
    Thread[] workers = new Thread[10];
    for (int i=0; i<workers.length; i++)
      workers[i] = new Worker("Worker Thread " + i);
    // We then push 100 commands onto the queue
    for (int i=0; i<100; i++) {
      final String msg = "Task " + i + " completed";
      bq.push(new Runnable() {
        public void run() {
          System.out.println(msg);
          // Sleep a random amount of time, up to 1 second
          try { Thread.sleep((long)(Math.random()*1000)); }
          catch(InterruptedException ex) { }
        }
      });
    }
    // We then push one "poison pill" onto the queue for each
    // worker thread, which will only be processed once the other
    // tasks are completed.
    for (int i=0; i<workers.length; i++) {
      bq.push(new Runnable() {
        public void run() {
          Thread.currentThread().interrupt();
        }
      });
    }
    // Lastly we join ourself to each of the Worker threads, so
    // that we only continue once all the worker threads are
    // finished.
    for (int i=0; i<workers.length; i++) {
      try {
        workers[i].join();
      } catch(InterruptedException ex) {}
    }
    System.out.println("BlockingQueueTest finished");
  }
  public static void main(String[] args) throws Exception{
    new BlockingQueueTest();
  }
}

The concepts in the newsletter can be expanded quite a bit. They could, for example, be used as a basis for implementing a ThreadPool, or otherwise you can implement an "ActiveQueue" which performs callbacks to listeners each time an event is pushed onto the queue via a Thread running inside the ActiveQueue.

It is also possible to use PipedInputStream and PipedOutputStream to send messages between threads, but then you have to set up a whole protocol, and if you want to exchange objects you have to use ObjectOutputStream which will be alot slower than just passing handles.

Until next week, and please remember to forward this newsletter in its entirety to as many Java users as you know.

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...