Abstract: LinkedHashSet is a set that can also maintain order. To make this thread-safe, we can wrap it with Collections.synchronizedSet(). However, this is not a good option, because iteration would still be fast-fail. And what's the point of a thread-safe LinkedHashSet that we cannot iterate over? In this newsletter we try to create a concurrent set that behaves like a LinkedHashSet, but with minimal locking and with a weakly-consistent iteration.
Welcome to the 296th edition of The Java(tm) Specialists' Newsletter, and warm greetings from a wet and cold Island of Crete. Our summer tourists would be quite amazed at how drenched we get. Which other place has had its bad weather inscribed into the Holy Bible? (Saint Paul got caught in a hurricane while trying to navigate the South of Crete and after a shipwreck ended up on Malta.) Oh, and we have earthquakes galore. One of my friends sent me this email a few days ago: "Earthquakes happen very often now... Is it safe to stay there?" Good question indeed. No idea. But it is a nice place to live when it ain't rockin' and rollin', or raining cats and dogs.
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
What's wrong with this code?
private final Set<Connection> connections = Collections.synchronizedSet(new LinkedHashSet<>());
   During a recent talk, a friend showed a class with such a field. Someone
   complained. This would "encourage programmers to use
   Collections.synchronizedSet() in production code. A concurrent
   collection would be preferable."
   
   Since I have a particular interest in concurrency, I jumped to my friend's
   defense. It is difficult writing slides. Not every snippet of code we show
   is an example of perfect production code. Then, a synchronized collection
   is not always slower than a ConcurrentHashMap. After all, the
   ConcurrentHashMap itself maintains its invariants with
   synchronized.
   
   I do not know whether the synchronized wrapped LinkedHashSet would be slow or
   not. Uncontended synchronization is fast. Since we are doing networking, I would
   expect that to be far more costly than a little lock. A bigger concern to me
   is that the LinkedHashSet iteration is fast-fail. The synchronized wrapper
   would not protect us against a ConcurrentModificationException.
   We could lock the entire set during iteration, but iteration
   is O(n) cost and thus locking might not be such a good idea.
   In a threaded environment, weakly-consistent iteration is preferable.
   
   I began to wonder how we could create a concurrent form of
   LinkedHashSet. Instead of TreeSet, we
   can use the concurrent, thread-safe ConcurrenSkipListSet.
   Similarly, instead of HashSet, we could use
   ConcurrentHashMap.newKeySet(). But LinkedHashSet
   did not seem to have an obvious concurrent alternative.
   
   Here is my attempt, combining a ConcurrentHashMap with a
   ConcurrentSkipListSet.
   Our class offers a reduced interface of Set, thus only:
   
add(e)remove(e)contains(e)stream()clear()iterator()toString()
   The ConcurrentSkipListSet maintains the insertion order. The
   ConcurrentHashMap ensures that elements are distinct.  We store
   the element and insertion order inside the InsertionOrder
   record. These are then stored inside the
   ConcurrentSkipListSet, sorted by the insertion order.
   We store our element as a key inside the ConcurrentHashMap.
   The values are the same InsertionOrders that are inside the
   ConcurrentSkipListSet.
   
   When we add an element, we call the computeIfAbsent() method
   on our map. If the element does not exist in the map yet, we create our
   InsertionOrder and add it to the set. When we want to remove it
   again, we use computeIfPresent() to also remove it from the set.
   Both of these compute methods are performed atomically on a
   ConcurrentHashMap.
   
   Here is the class. Please shout
   if you can think of a better approach for a
   thread-safe concurrent LinkedHashSet. I have not
   done extensive testing on this class. Neither have I benchmarked the
   performance. I have thus no idea whether it is faster or slower than the
   original synchronize wrapped LinkedHashSet. I'm not even sure
   that the code is correct. Please do not use it in production without
   extensive testing. And when (not if) you find glaring errors, please let me
   know so that I can update this newsletter :-)
   
   Note: Small change to the original newsletter, thanks to
   Jesper Udby. Instead of a static shared AtomicLong, we let each
   set contain its own and pass the order into the record.
   Thanks Jesper! Also thanks to Cor Takken for
   suggesting better names for the fields and record.
   
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
public class ConcurrentLinkedReducedHashSet<E> implements Iterable<E> {
  /**
   * A tuple holding our value and its insertion order.
   */
  private record InsertionOrder<T>(T value, long order) { }
  /**
   * Contains a mapping from our element to its insertionOrder.
   */
  private final Map<E, InsertionOrder<E>> elements =
      new ConcurrentHashMap<>();
  /**
   * The insertion order maintained in a ConcurrentSkipListSet,
   * so that we iterate in the correct order.
   */
  private final Set<InsertionOrder<E>> elementsOrderedByInsertion =
      new ConcurrentSkipListSet<>(
          Comparator.comparingLong(InsertionOrder::order));
  /**
   * AtomicLong for generating the next insertion order.
   */
  private final AtomicLong nextOrder = new AtomicLong();
  public boolean add(E e) {
    var added = new AtomicBoolean(false);
    elements.computeIfAbsent(
        e, key -> {
          var holder = new InsertionOrder<>(e,
            nextOrder.getAndIncrement());
          elementsOrderedByInsertion.add(holder);
          added.set(true);
          return holder;
        }
    );
    return added.get();
  }
  public boolean remove(E e) {
    var removed = new AtomicBoolean(false);
    elements.computeIfPresent(e, (key, holder) -> {
      elementsOrderedByInsertion.remove(holder);
      removed.set(true);
      return null; // will remove the entry
    });
    return removed.get();
  }
  public boolean contains(E e) {
    return elements.containsKey(e);
  }
  public Stream<E> stream() {
    return elementsOrderedByInsertion.stream().map(InsertionOrder::value);
  }
  public void clear() {
    // slow, but ensures we remove all entries in both collections
    stream().forEach(this::remove);
  }
  @Override
  public Iterator<E> iterator() {
    return stream().iterator();
  }
  @Override
  public String toString() {
    return stream()
        .map(String::valueOf)
        .collect(Collectors.joining(", ", "[", "]"));
  }
}
   In this demo, we iterate in insertion order
   without a ConcurrentModificationException:
   
public class WeaklyConsistentOrderedDemo {
  public static void main(String... args) {
//    var set = Collections.synchronizedSet(new LinkedHashSet<String>()); // CME
//    var set = ConcurrentHashMap.<String>newKeySet(); // works, wrong order
    var set = new ConcurrentLinkedReducedHashSet<String>(); // Perfect (maybe)
    set.add("hello");
    set.add("world");
    Iterator<String> it = set.iterator();
    set.add("Goodbye");
    while (it.hasNext()) {
      String next = it.next();
      System.out.println(next);
    }
  }
}
Output from our reduced set is:
hello world Goodbye
In the next demo, we remove "hello" after having iterated past it, and add it again. We would expect "hello" to thus show up at the end, and it does indeed.
public class WeaklyConsistentOrderedDemoComplex {
  public static void main(String... args) {
    var set = new ConcurrentLinkedReducedHashSet<String>();
    set.add("hello");
    set.add("world");
    set.add("Goodbye");
    Iterator<String> it = set.iterator();
    System.out.println(it.next()); // hello
    System.out.println(it.next()); // world
    set.remove("hello");
    set.add("hello");
    System.out.println(it.next()); // Goodbye
    System.out.println(it.next()); // hello
    System.out.println(it.hasNext()); // false
  }
}
The output is:
hello world Goodbye hello false
So far, so good. Next we have a demo that removes and adds 1.6m random numbers between 0..9 into our set. There should be no duplicates at the of the run, if everything is working.
import java.util.concurrent.*;
public class ConcurrentUpdatesDemo {
  public static void main(String... args) throws InterruptedException {
    var set = new ConcurrentLinkedReducedHashSet<Integer>();
    ExecutorService pool = Executors.newFixedThreadPool(16);
    for (int i = 0; i < 16; i++) {
      pool.submit(() -> {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int j = 0; j < 100_000; j++) {
          int value = random.nextInt(0, 10);
          set.remove(value);
          set.add(value);
        }
      });
    }
    pool.shutdown();
    while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
      System.out.println("Waiting for pool to shut down");
    }
    System.out.println("set = " + set);
  }
}
That also seems to work:
set = [7, 6, 1, 3, 5, 2, 8, 9, 0, 4]
   At the outset, I said that this would be a reduced set that
   only has the methods that we are going to use. But what if
   we need a java.util.Set? We could throw
   UnsupportedOperationException for all the
   methods, besides those we implemented.
   
   This is surprisingly easy to do if you've read my book on 
   dynamic proxies :-) [Or grab my course on dynamic proxies in Java
   here.]
   We first create a Set that throws
   UnsupportedOperationException for all methods:
   
Set<String> angrySet = Proxies.castProxy(
    Set.class,
    (p, m, a) -> {
      throw new UnsupportedOperationException(
          m.getName() + "() not implemented");
    }
);
   Next we wrap that inside a dynamic object adapter, with our
   ConcurrentLinkedReducedHashSet as the adapter. The dynamic
   object adapter will give precedence to our methods. It will thus throw an
   UnsupportedOperationException whenever we try call another
   method from the Set. The resulting type will be a
   Set and we could add more methods as needed.
   
Set<String> set = Proxies.adapt(
    Set.class, // target interface
    angrySet, // adaptee
    new ConcurrentLinkedReducedHashSet<>() // adapter
);
A complete demo would look like this:
// Maven: eu.javaspecialists.books.dynamicproxies:core:2.0.0
import eu.javaspecialists.books.dynamicproxies.*;
import java.util.*;
public class DynamicProxiesDemo {
  public static void main(String... args) {
    Set<String> angrySet = Proxies.castProxy(
        Set.class,
        (p, m, a) -> {
          throw new UnsupportedOperationException(
              m.getName() + "() not implemented");
        }
    );
    Set<String> set = Proxies.adapt(
        Set.class, // target interface
        angrySet, // adaptee
        new ConcurrentLinkedReducedHashSet<>() // adapter
    );
    set.add("hello");
    set.add("world");
    Iterator<String> it = set.iterator();
    set.add("Goodbye");
    while (it.hasNext()) {
      String next = it.next();
      System.out.println(next);
    }
    set.clear();
    set.addAll(List.of("one")); // UnsupportedOperationException
  }
}    
Again, no guarantees about how fast this will be, nor whether it works at all. I would be delighted to see a better solution, using the standard JDK classes.
Kind regards
Heinz
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.