Running on Java 19-ea+28-2110 (Preview)
Home of The JavaSpecialists' Newsletter

296Concurrent LinkedHashSet

Author: Dr Heinz M. KabutzDate: 2021-12-31Java Version: 17Category: Concurrency
 

Abstract: LinkedHashSet is a set that can also maintain order. To make this thread-safe, we can wrap it with Collections.synchronizedSet(). However, this is not a good option, because iteration would still be fast-fail. And what's the point of a thread-safe LinkedHashSet that we cannot iterate over? In this newsletter we try to create a concurrent set that behaves like a LinkedHashSet, but with minimal locking and with a weakly-consistent iteration.

 

Welcome to the 296th edition of The Java(tm) Specialists' Newsletter, and warm greetings from a wet and cold Island of Crete. Our summer tourists would be quite amazed at how drenched we get. Which other place has had its bad weather inscribed into the Holy Bible? (Saint Paul got caught in a hurricane while trying to navigate the South of Crete and after a shipwreck ended up on Malta.) Oh, and we have earthquakes galore. One of my friends sent me this email a few days ago: "Earthquakes happen very often now... Is it safe to stay there?" Good question indeed. No idea. But it is a nice place to live when it ain't rockin' and rollin', or raining cats and dogs.

And don't forget - today is the last day of 2021 and your last chance to spend 2021 training budget. For example, by grabbing our Java Specialists Superpack :-)

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Concurrent LinkedHashSet

What's wrong with this code?

private final Set<Connection> connections = Collections.synchronizedSet(new LinkedHashSet<>());

During a recent talk, a friend showed a class with such a field. Someone complained. This would "encourage programmers to use Collections.synchronizedSet() in production code. A concurrent collection would be preferable."

Since I have a particular interest in concurrency, I jumped to my friend's defense. It is difficult writing slides. Not every snippet of code we show is an example of perfect production code. Then, a synchronized collection is not always slower than a ConcurrentHashMap. After all, the ConcurrentHashMap itself maintains its invariants with synchronized.

I do not know whether the synchronized wrapped LinkedHashSet would be slow or not. Uncontended synchronization is fast. Since we are doing networking, I would expect that to be far more costly than a little lock. A bigger concern to me is that the LinkedHashSet iteration is fast-fail. The synchronized wrapper would not protect us against a ConcurrentModificationException. We could lock the entire set during iteration, but iteration is O(n) cost and thus locking might not be such a good idea. In a threaded environment, weakly-consistent iteration is preferable.

I began to wonder how we could create a concurrent form of LinkedHashSet. Instead of TreeSet, we can use the concurrent, thread-safe ConcurrenSkipListSet. Similarly, instead of HashSet, we could use ConcurrentHashMap.newKeySet(). But LinkedHashSet did not seem to have an obvious concurrent alternative.

Here is my attempt, combining a ConcurrentHashMap with a ConcurrentSkipListSet. Our class offers a reduced interface of Set, thus only:

  • add(e)
  • remove(e)
  • contains(e)
  • stream()
  • clear()
  • iterator()
  • toString()

The ConcurrentSkipListSet maintains the insertion order. The ConcurrentHashMap ensures that elements are distinct. We store the element and insertion order inside the InsertionOrder record. These are then stored inside the ConcurrentSkipListSet, sorted by the insertion order. We store our element as a key inside the ConcurrentHashMap. The values are the same InsertionOrders that are inside the ConcurrentSkipListSet.

When we add an element, we call the computeIfAbsent() method on our map. If the element does not exist in the map yet, we create our InsertionOrder and add it to the set. When we want to remove it again, we use computeIfPresent() to also remove it from the set. Both of these compute methods are performed atomically on a ConcurrentHashMap.

Here is the class. Please shout if you can think of a better approach for a thread-safe concurrent LinkedHashSet. I have not done extensive testing on this class. Neither have I benchmarked the performance. I have thus no idea whether it is faster or slower than the original synchronize wrapped LinkedHashSet. I'm not even sure that the code is correct. Please do not use it in production without extensive testing. And when (not if) you find glaring errors, please let me know so that I can update this newsletter :-)

Note: Small change to the original newsletter, thanks to Jesper Udby. Instead of a static shared AtomicLong, we let each set contain its own and pass the order into the record. Thanks Jesper! Also thanks to Cor Takken for suggesting better names for the fields and record.

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;

public class ConcurrentLinkedReducedHashSet<E> implements Iterable<E> {
  /**
   * A tuple holding our value and its insertion order.
   */
  private record InsertionOrder<T>(T value, long order) { }
  /**
   * Contains a mapping from our element to its insertionOrder.
   */
  private final Map<E, InsertionOrder<E>> elements =
      new ConcurrentHashMap<>();
  /**
   * The insertion order maintained in a ConcurrentSkipListSet,
   * so that we iterate in the correct order.
   */
  private final Set<InsertionOrder<E>> elementsOrderedByInsertion =
      new ConcurrentSkipListSet<>(
          Comparator.comparingLong(InsertionOrder::order));
  /**
   * AtomicLong for generating the next insertion order.
   */
  private final AtomicLong nextOrder = new AtomicLong();

  public boolean add(E e) {
    var added = new AtomicBoolean(false);
    elements.computeIfAbsent(
        e, key -> {
          var holder = new InsertionOrder<>(e,
            nextOrder.getAndIncrement());
          elementsOrderedByInsertion.add(holder);
          added.set(true);
          return holder;
        }
    );
    return added.get();
  }

  public boolean remove(E e) {
    var removed = new AtomicBoolean(false);
    elements.computeIfPresent(e, (key, holder) -> {
      elementsOrderedByInsertion.remove(holder);
      removed.set(true);
      return null; // will remove the entry
    });
    return removed.get();
  }

  public boolean contains(E e) {
    return elements.containsKey(e);
  }

  public Stream<E> stream() {
    return elementsOrderedByInsertion.stream().map(InsertionOrder::value);
  }

  public void clear() {
    // slow, but ensures we remove all entries in both collections
    stream().forEach(this::remove);
  }

  @Override
  public Iterator<E> iterator() {
    return stream().iterator();
  }

  @Override
  public String toString() {
    return stream()
        .map(String::valueOf)
        .collect(Collectors.joining(", ", "[", "]"));
  }
}

In this demo, we iterate in insertion order without a ConcurrentModificationException:

public class WeaklyConsistentOrderedDemo {
  public static void main(String... args) {
//    var set = Collections.synchronizedSet(new LinkedHashSet<String>()); // CME
//    var set = ConcurrentHashMap.<String>newKeySet(); // works, wrong order
    var set = new ConcurrentLinkedReducedHashSet<String>(); // Perfect (maybe)
    set.add("hello");
    set.add("world");
    Iterator<String> it = set.iterator();
    set.add("Goodbye");
    while (it.hasNext()) {
      String next = it.next();
      System.out.println(next);
    }
  }
}

Output from our reduced set is:

hello
world
Goodbye
  

In the next demo, we remove "hello" after having iterated past it, and add it again. We would expect "hello" to thus show up at the end, and it does indeed.

public class WeaklyConsistentOrderedDemoComplex {
  public static void main(String... args) {
    var set = new ConcurrentLinkedReducedHashSet<String>();
    set.add("hello");
    set.add("world");
    set.add("Goodbye");

    Iterator<String> it = set.iterator();
    System.out.println(it.next()); // hello
    System.out.println(it.next()); // world
    set.remove("hello");
    set.add("hello");
    System.out.println(it.next()); // Goodbye
    System.out.println(it.next()); // hello
    System.out.println(it.hasNext()); // false
  }
}

The output is:

hello
world
Goodbye
hello
false
  

So far, so good. Next we have a demo that removes and adds 1.6m random numbers between 0..9 into our set. There should be no duplicates at the of the run, if everything is working.

import java.util.concurrent.*;

public class ConcurrentUpdatesDemo {
  public static void main(String... args) throws InterruptedException {
    var set = new ConcurrentLinkedReducedHashSet<Integer>();
    ExecutorService pool = Executors.newFixedThreadPool(16);
    for (int i = 0; i < 16; i++) {
      pool.submit(() -> {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int j = 0; j < 100_000; j++) {
          int value = random.nextInt(0, 10);
          set.remove(value);
          set.add(value);
        }
      });
    }
    pool.shutdown();
    while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
      System.out.println("Waiting for pool to shut down");
    }
    System.out.println("set = " + set);
  }
}

That also seems to work:

set = [7, 6, 1, 3, 5, 2, 8, 9, 0, 4]
  

At the outset, I said that this would be a reduced set that only has the methods that we are going to use. But what if we need a java.util.Set? We could throw UnsupportedOperationException for all the methods, besides those we implemented.

This is surprisingly easy to do if you've read my book on dynamic proxies :-) [Or grab my course on dynamic proxies in Java here.] We first create a Set that throws UnsupportedOperationException for all methods:

Set<String> angrySet = Proxies.castProxy(
    Set.class,
    (p, m, a) -> {
      throw new UnsupportedOperationException(
          m.getName() + "() not implemented");
    }
);

Next we wrap that inside a dynamic object adapter, with our ConcurrentLinkedReducedHashSet as the adapter. The dynamic object adapter will give precedence to our methods. It will thus throw an UnsupportedOperationException whenever we try call another method from the Set. The resulting type will be a Set and we could add more methods as needed.

Set<String> set = Proxies.adapt(
    Set.class, // target interface
    angrySet, // adaptee
    new ConcurrentLinkedReducedHashSet<>() // adapter
);

A complete demo would look like this:

// Maven: eu.javaspecialists.books.dynamicproxies:core:2.0.0
import eu.javaspecialists.books.dynamicproxies.*;

import java.util.*;

public class DynamicProxiesDemo {
  public static void main(String... args) {
    Set<String> angrySet = Proxies.castProxy(
        Set.class,
        (p, m, a) -> {
          throw new UnsupportedOperationException(
              m.getName() + "() not implemented");
        }
    );

    Set<String> set = Proxies.adapt(
        Set.class, // target interface
        angrySet, // adaptee
        new ConcurrentLinkedReducedHashSet<>() // adapter
    );
    set.add("hello");
    set.add("world");
    Iterator<String> it = set.iterator();
    set.add("Goodbye");
    while (it.hasNext()) {
      String next = it.next();
      System.out.println(next);
    }
    set.clear();
    set.addAll(List.of("one")); // UnsupportedOperationException
  }
}    

Again, no guarantees about how fast this will be, nor whether it works at all. I would be delighted to see a better solution, using the standard JDK classes.

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '22

Superpack '22 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...

Java Emergency?

If your system is down, we will review it for 15 minutes and give you our findings for just 1 € without any obligation.