Home of The JavaSpecialists' Newsletter

274EnhancedStream

Author: Dr Heinz M. KabutzDate: 2019-11-22Java Version: 13Category: Tips and Tricks
 

Abstract: Java 8 Streams have a rather narrow-minded view of what makes objects distinct. In this newsletter, we enhance Streams to allow a more flexible approach for deciding uniqueness.

 

Welcome to the 274th edition of The Java(tm) Specialists' Newsletter, sent from the beautiful Island of Crete. My wife and I went for our daily run on the beach, followed by a swim in the sea. I swam around the little island off Kalathas, whereas Helene stayed closer to shore. By the time I got out, she had already left. It was only when I picked up my jacket that I realized that I had hidden my car keys deep in her beach bag without telling her. My phone was safely stowed in the glove box of my Suzuki Jimny. All I had with me was my Geecon Poland t-shirt, my Devoxx UK hoodie, my running shorts, and a pair of flip-flops. After thinking for a few seconds, I realized that it would be unkind to hitch a lift with my wet shorts, so I started jogging up the hill back to Chorafakia and my house. As uncomfortable as flip-flops are for running on the road, barefoot would have been worse. Joanna (famous gym instructor in Greece with her own TV show) sped past me in her black BMW and gave met the thumbs up. "That guy is hardcore - running in flip-flops" probably went through her mind, except in Greek. I arrived three kilometers later and saw with delight that my wife's car was still at home. It took Helene a while to understand exactly why I had chosen to leave the car at the beach ...

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

EnhancedStream

I've been wearing an Oura ring for about a year. This marvelous little device monitors my heart rate and other characteristics whilst I sleep. It doesn't do much else besides tracking my sleep quality. Here is what it said happened last night:

As you can see, I woke up several times. I also know why. Let me explain. You might be amused, or think I'm crazy, or both. When I told my friend John Green this story, he assured me that I was "special". Funny, my junior school teachers used those exact same words.

I dream code. Sometimes I will write long and complicated programs in my sleep. Last night I kept on being woken up by the realization and frustration that Java 8 Streams did not have good support for distinct streams. Yes, we have the distinct() call, but that always uses equals() to decide uniqueness. We cannot specify our own equality characteristics.

For example, let's say that we want to create a distinct stream of students. We want to make it distinct by where they got their BSc. Since John and I both did our BSc at the University of Cape Town, only one of us should remain in the stream. But which one? We are not exactly the same object. For example, we might want to keep the student that had the highest marks for his BSc. Or we might want to instead keep the student with the better looks.

Whilst writing my book on Dynamic Proxies in Java, I keep on bumping into such coding challenges. For example, when I get a stream of methods from ArrayDeque, I want them to be distinct by method name and parameter types. I cannot use the distinct() stream method, because equals() in the Method class also considers the return type and the class in which the method is defined. I could map() my object to another type that has the desired definitions for equals() and hashCode(), but even then, I would need to have a way to merge objects when they are considered duplicates. Stream will just throw away one of the two elements, but we have no control over which one is discarded.

My standard approach has been to call collect(Collectors.toMap()), with a special class to manage equality as the key, together with a merge function to decide which value to keep. Once the map is constructed, I then call values().stream() to convert the map back to a stream of the correct type.

Last night in my sleep, I kept on stumbling back into this problem, thinking there has to be a better way. Surely there is some method on Stream itself that would distinctify the elements properly? After tossing and turning, I would wake up, realize that there was no way, and try get back to sleep. This morning after my run back from the beach, I spent a long time carefully sifting through the methods in Stream and Collectors, but came up empty.

import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class EnhancedStream<T> implements Stream<T> {
  private static final class Key<E> {
    private final E e;
    private final ToIntFunction<E> hashCode;
    private final BiPredicate<E, E> equals;

    public Key(E e, ToIntFunction<E> hashCode,
               BiPredicate<E, E> equals) {
      this.e = e;
      this.hashCode = hashCode;
      this.equals = equals;
    }

    public int hashCode() {
      return hashCode.applyAsInt(e);
    }

    public boolean equals(Object obj) {
      if (!(obj instanceof Key)) return false;
      @SuppressWarnings("unchecked")
      Key<E> that = (Key<E>) obj;
      return equals.test(this.e, that.e);
    }
  }

  private Stream<T> delegate;

  public EnhancedStream(Stream<T> delegate) {
    this.delegate = delegate;
  }

  public EnhancedStream<T> distinct(ToIntFunction<T> hashCode,
                                    BiPredicate<T, T> equals,
                                    BinaryOperator<T> merger) {
    delegate = collect(Collectors.toMap(
          t -> new Key<>(t, hashCode, equals),
          Function.identity(),
          merger,
          LinkedHashMap::new))
        // thanks Federico Peralta Schaffner for suggesting that
        // we use a LinkedHashMap.  That way we can preserve the
        // original order and get the speed of hashing.
        .values()
        .stream();
    return this;
  }

  public EnhancedStream<T> filter(
      Predicate<? super T> predicate) {
    this.delegate = delegate.filter(predicate);
    return this;
  }

  public <R> EnhancedStream<R> map(
      Function<? super T, ? extends R> mapper) {
    return new EnhancedStream<>(delegate.map(mapper));
  }

  public IntStream mapToInt(ToIntFunction<? super T> mapper) {
    return delegate.mapToInt(mapper);
  }

  public LongStream mapToLong(
      ToLongFunction<? super T> mapper) {
    return delegate.mapToLong(mapper);
  }

  public DoubleStream mapToDouble(
      ToDoubleFunction<? super T> mapper) {
    return delegate.mapToDouble(mapper);
  }

  public <R> EnhancedStream<R> flatMap(
      Function<? super T,
          ? extends Stream<? extends R>> mapper) {
    return new EnhancedStream<>(delegate.flatMap(mapper));
  }

  public IntStream flatMapToInt(
      Function<? super T, ? extends IntStream> mapper) {
    return delegate.flatMapToInt(mapper);
  }

  public LongStream flatMapToLong(
      Function<? super T, ? extends LongStream> mapper) {
    return delegate.flatMapToLong(mapper);
  }

  public DoubleStream flatMapToDouble(
      Function<<? super T, ? extends DoubleStream> mapper) {
    return delegate.flatMapToDouble(mapper);
  }

  public EnhancedStream<T> distinct() {
    delegate = delegate.distinct();
    return this;
  }

  public EnhancedStream<T> sorted() {
    delegate = delegate.sorted();
    return this;
  }

  public EnhancedStream<T> sorted(
      Comparator<? super T> comparator) {
    delegate = delegate.sorted(comparator);
    return this;
  }

  public EnhancedStream<T> peek(Consumer<? super T> action) {
    delegate = delegate.peek(action);
    return this;
  }

  public EnhancedStream<T> limit(long maxSize) {
    delegate = delegate.limit(maxSize);
    return this;
  }

  public EnhancedStream<T> skip(long n) {
    delegate = delegate.skip(n);
    return this;
  }

  public EnhancedStream<T> takeWhile(
      Predicate<? super T> predicate) {
    delegate = delegate.takeWhile(predicate);
    return this;
  }

  public EnhancedStream<T> dropWhile(
      Predicate<? super T> predicate) {
    delegate = delegate.dropWhile(predicate);
    return this;
  }

  public void forEach(Consumer<? super T> action) {
    delegate.forEach(action);
  }

  public void forEachOrdered(Consumer<? super T> action) {
    delegate.forEachOrdered(action);
  }

  public Object[] toArray() {
    return delegate.toArray();
  }

  public <A> A[] toArray(IntFunction<A[]> generator) {
    return delegate.toArray(generator);
  }

  public T reduce(T identity, BinaryOperator<T> accumulator) {
    return delegate.reduce(identity, accumulator);
  }

  public Optional<T> reduce(BinaryOperator<T> accumulator) {
    return delegate.reduce(accumulator);
  }

  public <U> U reduce(U identity,
                      BiFunction<U, ? super T, U> accumulator,
                      BinaryOperator<U> combiner) {
    return delegate.reduce(identity, accumulator, combiner);
  }

  public <R> R collect(Supplier<R> supplier,
                       BiConsumer<R, ? super T> accumulator,
                       BiConsumer<R, R> combiner) {
    return delegate.collect(supplier, accumulator, combiner);
  }

  public <R, A> R collect(
      Collector<? super T, A, R> collector) {
    return delegate.collect(collector);
  }

  public Optional<T> min(Comparator<? super T> comparator) {
    return delegate.min(comparator);
  }

  public Optional<T> max(Comparator<? super T> comparator) {
    return delegate.max(comparator);
  }

  public long count() {
    return delegate.count();
  }

  public boolean anyMatch(Predicate<? super T> predicate) {
    return delegate.anyMatch(predicate);
  }

  public boolean allMatch(Predicate<? super T> predicate) {
    return delegate.allMatch(predicate);
  }

  public boolean noneMatch(Predicate<? super T> predicate) {
    return delegate.noneMatch(predicate);
  }

  public Optional<T> findFirst() {
    return delegate.findFirst();
  }

  public Optional<T> findAny() {
    return delegate.findAny();
  }

  public Iterator<T> iterator() {
    return delegate.iterator();
  }

  public Spliterator<T> spliterator() {
    return delegate.spliterator();
  }

  public boolean isParallel() {
    return delegate.isParallel();
  }

  public EnhancedStream<T> sequential() {
    delegate = delegate.sequential();
    return this;
  }

  public EnhancedStream<T> parallel() {
    delegate = delegate.parallel();
    return this;
  }

  public EnhancedStream<T> unordered() {
    delegate = delegate.unordered();
    return this;
  }

  public EnhancedStream<T> onClose(Runnable closeHandler) {
    delegate = delegate.onClose(closeHandler);
    return this;
  }

  public void close() {
    delegate.close();
  }

  public static <T> EnhancedStream<T> of(T t) {
    return new EnhancedStream<>(Stream.of(t));
  }

  @SafeVarargs
  @SuppressWarnings("varargs")
  // Creating a stream from an array is safe
  public static <T> EnhancedStream<T> of(T... values) {
    return new EnhancedStream<>(Arrays.stream(values));
  }
}
  

To help me sleep better tonight, I thought I'd create an EnhancedStream. The normal Java 8 Stream API does not lend itself for adding additional functions. However, we can do it using the decorator design pattern. My EnhancedStream implements the Stream interface and delegates to an instance of type Stream. All methods that ordinarily return Stream have been enhanced to instead return EnhancedStream. I have left the primitive streams as an exercise to the reader.

Beaches on Crete

For example, let's create a Stream<String> of beach names. We want to have a case-insensitive equivalence relation. Furthermore, if two Strings are equivalent (ignoring any difference in case), then we want to keep the one where the sum of all the characters is the highest. If they are equal, we want to keep the first that we found.

Here are the hashCode and equals functions:

ToIntFunction<String> HASH_CODE =
    s -> s.toUpperCase().hashCode();
BiPredicate<String, String> EQUALS =
    (s1, s2) ->
        s1.toUpperCase().equals(s2.toUpperCase());
  

The merge function looks like this:

BinaryOperator<String> MERGE =
    (s1, s2) ->
        s1.chars().sum() < s2.chars().sum() ? s2 : s1;
  

We can use the EnhancedStream as follows:

EnhancedStream.of("Kalathas", "Stavros", "STAVROS",
        "marathi", "kalathas", "baLos", "Balos", "BALOS")
    .distinct(STRING_HASH_CODE, STRING_EQUALS, STRING_MERGE)
    .forEach(System.out::println);
  

With output of:

kalathas
Stavros
marathi
baLos
  

The complete code is here in our BeachDistinctify class

import java.util.function.*;

public class BeachDistinctify {
  public static void main(String... args) {
    EnhancedStream.of("Kalathas", "Stavros", "STAVROS",
            "marathi", "kalathas", "baLos", "Balos")
        .distinct(HASH_CODE, EQUALS, MERGE)
        .forEach(System.out::println);
  }

  // case insensitive hashCode() and equals()
  public static final
  ToIntFunction<String> HASH_CODE =
      s -> s.toUpperCase().hashCode();
  public static final
  BiPredicate<String, String> EQUALS =
      (s1, s2) ->
          s1.toUpperCase().equals(s2.toUpperCase());

  // keep the string with the highest total ascii value
  public static final
  BinaryOperator<String> MERGE =
      (s1, s2) ->
          s1.chars().sum() < s2.chars().sum() ? s2 : s1;
}
  

Distinctifying Methods

The use case that kept on waking me up in a sweat was that of distinctifying methods. Here is an example of how we could use the EnhancedStream to produce a stream of unique methods.

import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;

public class MethodDistinctify {
  public static void main(String... args) {
    System.out.println("Normal ArrayDeque clone() Methods:");
    EnhancedStream.of(ArrayDeque.class.getMethods())
        .filter(method -> method.getName().equals("clone"))
        .forEach(MethodDistinctify::print);

    System.out.println();

    System.out.println("Distinct ArrayDeque:");
    EnhancedStream.of(ArrayDeque.class.getMethods())
        .filter(method -> method.getName().equals("clone"))
        .distinct(HASH_CODE, EQUALS, MERGE)
        .forEach(MethodDistinctify::print);

    System.out.println();

    System.out.println("Normal ConcurrentSkipListSet:");
    EnhancedStream.of(ConcurrentSkipListSet.class.getMethods())
        .filter(method -> method.getName().contains("Set"))
        .sorted(METHOD_COMPARATOR)
        .forEach(MethodDistinctify::print);

    System.out.println();

    System.out.println("Distinct ConcurrentSkipListSet:");
    EnhancedStream.of(ConcurrentSkipListSet.class.getMethods())
        .filter(method -> method.getName().contains("Set"))
        .distinct(HASH_CODE, EQUALS, MERGE)
        .sorted(METHOD_COMPARATOR)
        .forEach(MethodDistinctify::print);
  }

  private static void print(Method m) {
    System.out.println(
        Stream.of(m.getParameterTypes())
            .map(Class::getSimpleName)
            .collect(Collectors.joining(
                ", ",
                "  " + m.getReturnType().getSimpleName()
                    + " " + m.getName() + "(",
                ")"))
    );
  }

  public static final ToIntFunction<Method> HASH_CODE =
      method -> method.getName().hashCode() +
          method.getParameterCount();

  public static final BiPredicate<Method, Method> EQUALS =
      (method1, method2) ->
          method1.getName().equals(method2.getName()) &&
              method1.getParameterCount() ==
                  method2.getParameterCount() &&
              Arrays.equals(method1.getParameterTypes(),
                  method2.getParameterTypes());

  public static final BinaryOperator<Method> MERGE =
      (method1, method2) -> {
        if (method1.getReturnType()
            .isAssignableFrom(method2.getReturnType()))
          return method2;
        if (method2.getReturnType()
            .isAssignableFrom(method1.getReturnType()))
          return method1;
        throw new IllegalArgumentException(
            "Conflicting return types " +
                method1.getReturnType().getCanonicalName() +
                " and " +
                method2.getReturnType().getCanonicalName());
      };

  public static final Comparator<Method> METHOD_COMPARATOR =
      Comparator.comparing(Method::getName)
          .thenComparing(method ->
              Arrays.toString(method.getParameterTypes()));
}
  

The output is as follows:

Normal ArrayDeque clone() Methods:
  ArrayDeque clone()
  Object clone()

Distinct ArrayDeque:
  ArrayDeque clone()

Normal ConcurrentSkipListSet:
  NavigableSet descendingSet()
  NavigableSet headSet(Object, boolean)
  SortedSet headSet(Object)
  NavigableSet headSet(Object)
  NavigableSet subSet(Object, boolean, Object, boolean)
  NavigableSet subSet(Object, Object)
  SortedSet subSet(Object, Object)
  NavigableSet tailSet(Object, boolean)
  SortedSet tailSet(Object)
  NavigableSet tailSet(Object)

Distinct ConcurrentSkipListSet:
  NavigableSet descendingSet()
  NavigableSet headSet(Object, boolean)
  NavigableSet headSet(Object)
  NavigableSet subSet(Object, boolean, Object, boolean)
  NavigableSet subSet(Object, Object)
  NavigableSet tailSet(Object, boolean)
  NavigableSet tailSet(Object)
  

We can thus see that the ArrayDeque has two different clone() methods, one returning an Object and the other returning an ArrayDeque. Our enhanced distinct() method returned the one with the most derived type. The same happened with the ConcurrentSkipListSet. Three of the methods had duplicates, and these were eliminated as part of our distinctifying.

Kind regards

Heinz

P.S. In case you're wondering what the word "distincify" is, look no further than the Urban Dictionary ;-)

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack 2019

Superpack 2019 Our entire Java Specialists Training in one huge bundle more...
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...

Java Emergency?

If your system is down, we will review it for 15 minutes and give you our findings for just 1 € without any obligation.