Abstract: A few years ago, we tried to create a new distinct() function for a Stream. It wasn't pretty. In this newsletter we revisit that adventure using the new Stream Gatherers from Java 24.
Welcome to our 326th edition, sent from the Island of Crete. JCrete is happening next week, and the JCretans are already arriving to enjoy what this magical place has to offer. They are in good company. We have over 6 million visitors per year (well, not us personally, but sometimes it feels like it). Walkolution came to film me running on the beach. We rendezvoused before our rooster Arthur had crowed his first volley. Fortunately my usual beach buddies were still fast asleep, as they would have mocked me without mercy ;-) Only my friends from Highways Today spotted me.
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
In 2019, in Newsletter 274 - EnhancedStream, we explored how we could define a different distinct() operation in a stream. We did this by overriding the Stream class and then added a specialized distinct() method with mechanisms for determining uniqueness. It was a lot of work overriding all the Stream methods. We then showed another approach using reflection with Newsletter 275 - EnhancedStream with Dynamic Proxy. Neither approach was particularly satisfying. The first was too long, the second too mysterious.
When Victor Klang first spoke about Stream Gatherers, I immediately wondered whether this would present a more natural solution. Remember that what we wanted to do is to have a slightly different way of finding distinct values. Not just using hashCode() and equals(), but perhaps using other functions. In our example from the previous newsletters, we wanted to have case-insensitive distinct operations of these beach names: "Kalathas", "Stavros", "STAVROS", "marathi", "kalathas", "baLos", "Balos". If we had two that matched the same, for example "STAVROS" and "Stavros", we wanted to keep the one with the highest total ASCII values.
To do this, we create a DistinctiyGatherer, which takes three functions as parameters. The first is to calculate the hashCode for the key. The second is to decide how to determine whether two elements are equal. The last decides what to do if we have two that match, which one we would keep. Here is the code:
import java.util.*; import java.util.function.*; import java.util.stream.*; public class DistinctifyGatherer { public static <T> Gatherer<T, ?, T> of( ToIntFunction<T> hashCode, BiPredicate<T, T> equals, BinaryOperator<T> merger) { class Key { private final T t; public Key(T t) {this.t = t;} public int hashCode() { return hashCode.applyAsInt(t); } public boolean equals(Object obj) { return obj instanceof Key that && equals.test(this.t, that.t); } } return Gatherer.<T, Map<Key, Key>, T>ofSequential( LinkedHashMap::new, (state, element, _) -> { var key = new Key(element); var existing = state.get(key); if (existing != null) { key = new Key(merger.apply( existing.t, key.t)); } state.put(key, key); return true; }, (keys, downstream) -> keys.values().stream() .takeWhile(_ -> !downstream.isRejecting()) .map(key -> key.t) .forEach(downstream::push) ); } }
Nice and simple. For now we just use a sequential gatherer, because we want to preserve the order of the elements in the stream with our LinkedHashMap. Note also that in our integrator (the second argument), we do not send anything downstream. We only do that in the finisher at the end. In order to not have unnecessary processing in the finisher, we interrupt the stream once the downstream starts rejecting elements.
Here is our BeachDistinctify, almost exactly the same as in Newsletter 274, and 275, except that this time round, we are using our new Gatherer:
import java.util.function.*; import java.util.stream.*; public class BeachDistinctify { public static void main(String... args) { Stream.of("Kalathas", "Stavros", "STAVROS", "marathi", "kalathas", "baLos", "Balos") .gather(DistinctifyGatherer.of( HASH_CODE, EQUALS, MERGE)) .forEach(System.out::println); } // case insensitive hashCode() and equals() public static final ToIntFunction<String> HASH_CODE = s -> s.toUpperCase().hashCode(); public static final BiPredicate<String, String> EQUALS = (s1, s2) -> s1.toUpperCase().equals(s2.toUpperCase()); // keep the string with the highest total ascii value public static final BinaryOperator<String> MERGE = (s1, s2) -> s1.chars().sum() < s2.chars().sum() ? s2 : s1; }
This again prints the following:
kalathas Stavros marathi baLos
We can also use this for "distinctifying" our original use case, the methods of a class:
import java.lang.reflect.*; import java.util.*; import java.util.concurrent.*; import java.util.function.*; import java.util.stream.*; public class MethodDistinctify { public static void main(String... args) { System.out.println("Normal ArrayDeque clone() Methods:"); Stream.of(ArrayDeque.class.getMethods()) .filter(method -> method.getName().equals("clone")) .forEach(MethodDistinctify::print); System.out.println(); System.out.println("Distinct ArrayDeque:"); Stream.of(ArrayDeque.class.getMethods()) .filter(method -> method.getName().equals("clone")) .gather(DistinctifyGatherer.of(HASH_CODE, EQUALS, MERGE)) .forEach(MethodDistinctify::print); System.out.println(); System.out.println("Normal ConcurrentSkipListSet:"); Stream.of(ConcurrentSkipListSet.class.getMethods()) .filter(method -> method.getName().contains("Set")) .sorted(METHOD_COMPARATOR) .forEach(MethodDistinctify::print); System.out.println(); System.out.println("Distinct ConcurrentSkipListSet:"); Stream.of(ConcurrentSkipListSet.class.getMethods()) .filter(method -> method.getName().contains("Set")) .gather(DistinctifyGatherer.of(HASH_CODE, EQUALS, MERGE)) .sorted(METHOD_COMPARATOR) .forEach(MethodDistinctify::print); } private static void print(Method m) { System.out.println( Stream.of(m.getParameterTypes()) .map(Class::getSimpleName) .collect(Collectors.joining( ", ", " " + m.getReturnType().getSimpleName() + " " + m.getName() + "(", ")")) ); } public static final ToIntFunction<Method> HASH_CODE = method -> method.getName().hashCode() + method.getParameterCount(); public static final BiPredicate<Method, Method> EQUALS = (method1, method2) -> method1.getName().equals(method2.getName()) && method1.getParameterCount() == method2.getParameterCount() && Arrays.equals(method1.getParameterTypes(), method2.getParameterTypes()); public static final BinaryOperator<Method> MERGE = (method1, method2) -> { if (method1.getReturnType() .isAssignableFrom(method2.getReturnType())) return method2; if (method2.getReturnType() .isAssignableFrom(method1.getReturnType())) return method1; throw new IllegalArgumentException( "Conflicting return types " + method1.getReturnType().getCanonicalName() + " and " + method2.getReturnType().getCanonicalName()); }; public static final Comparator<Method> METHOD_COMPARATOR = Comparator.comparing(Method::getName) .thenComparing(method -> Arrays.toString(method.getParameterTypes())); }
With the following output:
Normal ArrayDeque clone() Methods: ArrayDeque clone() Object clone() Distinct ArrayDeque: ArrayDeque clone() Normal ConcurrentSkipListSet: NavigableSet descendingSet() NavigableSet headSet(Object, boolean) NavigableSet headSet(Object) SortedSet headSet(Object) NavigableSet subSet(Object, boolean, Object, boolean) SortedSet subSet(Object, Object) NavigableSet subSet(Object, Object) NavigableSet tailSet(Object, boolean) NavigableSet tailSet(Object) SortedSet tailSet(Object) Distinct ConcurrentSkipListSet: NavigableSet descendingSet() NavigableSet headSet(Object, boolean) NavigableSet headSet(Object) NavigableSet subSet(Object, boolean, Object, boolean) NavigableSet subSet(Object, Object) NavigableSet tailSet(Object, boolean) NavigableSet tailSet(Object)
This is much better and is a great improvement to what we can do what with Streams in Java 24. Next up, I want to revisit our ShuffleCollector to be on-the-fly, perhaps also having shuffle "window" to be able to support infinite streams. Another thing I'd like to try is a gatherer for managing checked exceptions better. But that will have to wait until another day.
Kind regards
Heinz
P.S. I wrote this entire newsletter walking / standing on my new Walkolution 2. Incredible product that can enhance our productivity and make us healthier. I'm looking forward to being able to walk 8 hours a day whilst coding :-)
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.