Author: Dr Heinz M. KabutzDate: 2019-11-23Java Version: 13Category: Tips and Tricks

Abstract: In our previous newsletter we enhanced Java 8 Streams by decorating them with an EnhancedStream class. The code had a lot of repetition, which often leads to bugs if written by hand. In this newsletter we use a dynamic proxy to create an EnhancedStream. The resulting code is shorter and more consistent.


Welcome to the 275th edition of The Java(tm) Specialists' Newsletter. Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

EnhancedStream with Dynamic Proxy

In our previous newsletter we presented an EnhancedStream that allowed a more flexible approach to managing distinctness. We implemented Stream and changed all those methods that returned Stream, to instead return EnhancedStream. I'm not good at such mundane tasks (few humans are) and forgot a few.

We start by changing EnhancedStream to be a subinterface of Stream. This contains our new distinct() method, then all the methods whose return type we need to change to EnhancedStream, and lastly two static factory methods of() and from().

import java.lang.reflect.*;
import java.util.*;
import java.util.function.*;

public interface EnhancedStream<T> extends Stream<T> {
  EnhancedStream<T> distinct(ToIntFunction<T> hashCode,
                             BiPredicate<T, T> equals,
                             BinaryOperator<T> merger);
  EnhancedStream<T> filter(Predicate<? super T> predicate);
  <R> EnhancedStream<R> map(
      Function<? super T, ? extends R> mapper);
  <R> EnhancedStream<R> flatMap(
      Function<? super T, ? extends Stream<? extends R>> mapper);
  EnhancedStream<T> distinct();
  EnhancedStream<T> sorted();
  EnhancedStream<T> sorted(Comparator<? super T> comparator);
  EnhancedStream<T> peek(Consumer<? super T> action);
  EnhancedStream<T> limit(long maxSize);
  EnhancedStream<T> skip(long n);
  EnhancedStream<T> takeWhile(Predicate<? super T> predicate);
  EnhancedStream<T> dropWhile(Predicate<? super T> predicate);
  EnhancedStream<T> sequential();
  EnhancedStream<T> parallel();
  EnhancedStream<T> unordered();
  EnhancedStream<T> onClose(Runnable closeHandler);

  // static factory methods
  static <E> EnhancedStream<E> of(E... elements) {
    return from(Stream.of(elements));
  static <E> EnhancedStream<E> from(Stream<E> stream) {
    return (EnhancedStream<E>) Proxy.newProxyInstance(
        new Class<?>[] {EnhancedStream.class},
        new EnhancedStreamHandler<>(stream)

Our EnhancedStreamHandler contains the Key and the Stream delegate that we had inside the EnhancedStream class in our previous newsletter. Furthermore, we find the enhanced distinct() method and create a methodMap of all the remaining methods from EnhancedStream to Stream. That way we can quickly find the correct method on our delegate.

All method calls are routed via the invoke() method. Inside invoke(), we first decide whether the method is our enhanced distinct() method. If it is, we call that directly. Otherwise, if the return type is EnhancedStream, we find the matching method in our methodMap and invoke that on our delegate. In this case we return the proxy, which is an instance of type EnhancedStream. Alternatively we return the result of calling the method directly on our delegate.

Here is the EnhancedStreamHandler:

import java.lang.reflect.*;
import java.util.*;
import java.util.function.*;

public class EnhancedStreamHandler<T>
    implements InvocationHandler {
  private static final class Key<E> {
    private final E e;
    private final ToIntFunction<E> hashCode;
    private final BiPredicate<E, E> equals;

    public Key(E e, ToIntFunction<E> hashCode,
               BiPredicate<E, E> equals) {
      this.e = e;
      this.hashCode = hashCode;
      this.equals = equals;

    public int hashCode() {
      return hashCode.applyAsInt(e);

    public boolean equals(Object obj) {
      if (!(obj instanceof Key)) return false;
      Key<E> that = (Key<E>) obj;
      return equals.test(this.e, that.e);

  private Stream<T> delegate;

  public EnhancedStreamHandler(Stream<T> delegate) {
    this.delegate = delegate;

  private static final Method enhancedDistinct;

  static {
    try {
      enhancedDistinct = EnhancedStream.class.getMethod(
          "distinct", ToIntFunction.class, BiPredicate.class,
    } catch (NoSuchMethodException e) {
      throw new Error(e);

  private static final Map<Method, Method> methodMap =
          .filter(m -> !m.equals(enhancedDistinct))
          .filter(m -> !Modifier.isStatic(m.getModifiers()))
              m -> {
                try {
                  return Stream.class.getMethod(
                      m.getName(), m.getParameterTypes());
                } catch (NoSuchMethodException e) {
                  throw new Error(e);

  public Object invoke(Object proxy, Method method,
                       Object[] args) throws Throwable {
    if (method.equals(enhancedDistinct)) {
      return distinct(
          (EnhancedStream<T>) proxy,
          (ToIntFunction<T>) args[0],
          (BiPredicate<T, T>) args[1],
          (BinaryOperator<T>) args[2]);
    } else if (method.getReturnType() == EnhancedStream.class) {
      Method match = methodMap.get(method);
      this.delegate = (Stream) match.invoke(delegate, args);
      return proxy;
    } else {
      return method.invoke(this.delegate, args);

  private EnhancedStream<T> distinct(EnhancedStream<T> proxy,
                                     ToIntFunction<T> hashCode,
                                     BiPredicate<T, T> equals,
                                     BinaryOperator<T> merger) {
    delegate = delegate.collect(Collectors.toMap(
        t -> new Key<>(t, hashCode, equals),
    return proxy;

Our client code looks exactly the same as before. Here is again our BeachDistinctify class:

import java.util.function.*;

public class BeachDistinctify {
  public static void main(String... args) {
    EnhancedStream.of("Kalathas", "Stavros", "STAVROS",
            "marathi", "kalathas", "baLos", "Balos")
        .distinct(HASH_CODE, EQUALS, MERGE)

  // case insensitive hashCode() and equals()
  public static final
  ToIntFunction<String> HASH_CODE =
      s -> s.toUpperCase().hashCode();
  public static final
  BiPredicate<String, String> EQUALS =
      (s1, s2) ->

  // keep the string with the highest total ascii value
  public static final
  BinaryOperator<String> MERGE =
      (s1, s2) ->
          s1.chars().sum() < s2.chars().sum() ? s2 : s1;

There are some disadvantages with this new approach. There might be a slight method call overhead with some of the proxied methods. For example, each time our return type is EnhancedStream, we need to do a map lookup. Secondly, return values might need to be boxed from primitives to objects and back. Thirdly, methods suffer from amnesia; they have to check every single time that we have the permission to invoke them. We deal with these issues in the upcoming book Dynamic Proxies in Java, which we are publishing as a free e-book on InfoQ. Would you like to help us by reviewing the book? Please sign up here.

Kind regards




We are always happy to receive comments from our readers.

