Home of The JavaSpecialists' Newsletter

261Concurrent Queue Sizes and Hot Fields

Posted: 2018-09-20Category: ConcurrencyJava Version: OpenJDK 11Dr. Heinz M. Kabutz
 

Abstract: ConcurrentLinkedQueue's size() method is not very useful in a multi-threaded environment, because it counts the number of elements in the queue, rather than relying on a "hot field" to store the size. The result might be completely incorrect, or in strange situations, never return.

 

Welcome to the 261st edition of The Java(tm) Specialists' Newsletter. Next month I am speaking at Oracle Code One (formerly JavaOne). My topics are concurrency (Phaser, StampedLock and VarHandles), performance (evolution of java.lang.String) and unconferences (a BoF about JCrete and how to disorganize such an event). Usually my talks fall in the category of "mildly entertaining", so unless you have something interesting going on during my slots, please join me :-)

At JCrete 2018 we put together a list of recommended books. Only 4/18 have Java in their title. Here they are - happy reading:


javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Concurrent Queue Sizes and Hot Fields

java.util.Collection has a size() method. Who would not want to know how many elements are in their collection?

In concurrency, we have the concept of a "hot field". Such a field will need to be accessed whenever we call a method on the object. For example, inside the ArrayBlockingQueue, the items array is a "hot field". We cannot add an item whilst removing another, because both methods lock on the same ReentrantLock to access the array.

ConcurrentLinkedQueue was designed to be a generalized thread-safe FIFO queue for when we did not need a BlockingQueue. It should not matter whether we had multi-producer-multi-consumer systems (MPMC) or single-producer-single-consumer (SPSC) or a combination of both. In all cases it should work correctly. The queue should also be unbounded and non-blocking in its thread-safety implementation.

In the first version of ConcurrentLinkedQueue, thread safety was managed with AtomicReferenceFieldUpdater, but was changed in Java 7 to sun.misc.Unsafe and in Java 9 to use VarHandle. They fastidiously avoided "hot fields". For example, the head and tail of the ConcurrentLinkedQueue could be updated independently from each other. Thus one thread could add whilst another thread could remove elements from the queue without causing contention.

Another field where they wanted to avoid contention was the size of the queue. Even though java.util.Collection offers size() as a method, they decided that java.util.Queue would not have to implement it in a useful way. Yep, the JavaDocs even state this explicitly: "this method is typically not very useful in concurrent applications" Thus instead of storing the size inside an AtomicInteger, necessitating contended updates on both add and remove of the queue, they count the elements every time we called size().

Two effects can be seen. Firstly, the time it takes to execute size() is directly related to the length of the queue. Secondly, in size() we start counting from the head of the queue. If whilst we are counting, new elements are added to the back and old ones removed from the front, the result might be larger than the true number of items ever in the queue. In rare situations, these two effects could combine so that size() never returns. Consider this InfiniteSize class, written with the new Java 10 var syntax where it makes sense:

import java.util.*;
import java.util.concurrent.*;

public class InfiniteSize {
  public static void main(String... args) {
    var queue = new ConcurrentLinkedQueue<String>();
    for (int i = 0; i < 40_000_000; i++) {
      queue.add("test");
    }

    var phaser = new Phaser(900);
    for (int i = 0; i < 1000; i++) { // 1000 threads?  Seriously?
      Thread t = new Thread(() -> {
        phaser.arriveAndAwaitAdvance();
        while (true) {
          queue.add("test");
          queue.remove();
        }
      });
      t.setDaemon(true);
      t.start();
    }
    var time = System.nanoTime();
    try {
      System.out.println("Measuring queue size");
      System.out.println(queue.size()); // might never return
    } finally {
      time = System.nanoTime() - time;
      System.out.printf("time = %dms%n", (time / 1_000_000));
    }
  }
}

The code in InfiniteSize is truly dreadful. 1000 threads are all contending for the queue. Note how I use Phaser to let the threads start, but then throttle them back a bit. I ran this with the new OpenJDK 11 Epsilon Garbage Collector (-verbose:gc -Xmx16g -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC) and after it printed "Measuring queue size" it eventually crashed with all 16 GB exhausted. In case you have not heard of Epsilon, it does not collect any unused objects. It is a great collector to use for running performance tests if you want to eliminate GC costs from your experiment. I also tried with G1, but that was hopeless and spent 50% of time in GC. I let it run for 400 seconds before killing the process. Since Epsilon GC does not spend resources trying to free memory, we can probably conclude that the call to size() was not keeping up with the other 1000 threads adding and removing elements. Or maybe there was something else. As I said, it is dreadful code and fortunately not at all realistic of what we would find in production. For normal code, it won't happen that size() takes forever to return. It is an O(n) method call, but if your queues are so long that this is significant, you have bigger problems and should perhaps consider a career at airport security ;-)

I think we can thus safely ignore the O(n) cost of size() and instead focus on how correct it is. It is not. The value might not be at all related to how many elements were ever in the queue at one time.

Here is another class that creates a queue with 10 elements and then adds and removes elements with another thread. Whilst that is going on, size() is called and we see what the largest value is. Note that I again use the Java 10 var syntax, but not to replace the for (int ... since that would neither make the code clearer nor save typing.

import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;

public class QueueSize {
  public static final int RESIDENT_QUEUE_SIZE = 10;
  public static final int ADD_AND_REMOVES = 1_000_000;

  public static void main(String... args) {
    for (int i = 0; i < 10; i++) {
      // size either 10 or 11
      test(LinkedBlockingQueue::new);

      // size always >= 10
      test(LinkedTransferQueue::new);
      test(ConcurrentLinkedQueue::new);
      System.out.println();
    }
  }

  private static void test(Supplier<Queue<String>> queueType) {
    var queue = queueType.get();
    for (int i = 0; i < RESIDENT_QUEUE_SIZE; i++) {
      queue.add("test" + i);
    }

    var thread = addRemoveThread(queue);

    var maxSize = 0;
    while (thread.isAlive()) {
      maxSize = Math.max(maxSize, queue.size());
    }
    System.out.printf(Locale.US, "%s: maxSize=%d%n",
        queue.getClass().getSimpleName(), maxSize);
  }

  private static Thread addRemoveThread(Queue<String> queue) {
    var thread = new Thread(() -> {
      for (int i = 0; i < ADD_AND_REMOVES; i++) {
        queue.add("test" + (i + RESIDENT_QUEUE_SIZE));
        queue.remove();
      }
    }, "addRemoveThread");
    thread.start();
    return thread;
  }
}
  

The largest value of size() with the LinkedBlockingQueue is 11. LinkedTransferQueue and ConcurrentLinkedQueue could return anything >= 10, for example I saw:

LinkedBlockingQueue: maxSize=11
LinkedTransferQueue: maxSize=54
ConcurrentLinkedQueue: maxSize=38
  

The values for size() are not just stale, but can be completely wrong. It would have been more honest to either always return 0 or to throw an UnsupportedOperationException. In a single-threaded situation, size() would work, but then we would rather use an ArrayDeque or a LinkedList. The challenge is that the contract of Collection.size() does not say anything about the possibility of an UnsupportedOperationException, therefore implementations should also desist from throwing it.

Of course, if you know how your queue will be used, it is better to use a queue specific to your requirements. Have a look at Java Champion Nitsan Wakart's excellent JCTools, inspired by Martin Thompson's work on Mechanical Sympathy. The default queue that is created by QueueFactory when we specify an unbounded MPMC queue is the ConcurrentLinkedQueue. Make it bounded or change it to SPSC and the resultant queue can be factors faster.

Back to the hot field. I believe there was a thought recently to store ConcurrentLinkedQueue's size inside a LongAdder, since that has much better performance under contention than AtomicInteger. Whilst a good argument, it might also dramatically increase the size of the ConcurrentLinkedQueue object on multi-processor systems if the field ends up being contended.

Thanks for reading this and please check out our concurrency training if you would like to learn more.

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack 2018

Superpack 2018 Our entire Java Specialists Training in one huge bundle more...
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...

Java Emergency?

If your system is down, we will review it for 15 minutes and give you our findings for just 1 € without any obligation.