Running on Java 17+35-2724 (Preview)
Home of The JavaSpecialists' Newsletter

289MergingSortedSpliterator

Author: Dr Heinz M. KabutzDate: 2021-04-03Java Version: 11+Category: Tips and Tricks
 

Abstract: If we have a List<Stream<T>>, each stream with sorted elements, how can we generate a sorted Stream<T>, taking one at a time from each stream? In this newsletter we show how to do that with the Stream API and by writing our own MergingSortedSpliterator.

 

Welcome to the 289th edition of The Java(tm) Specialists' Newsletter. I know, I know, 36 hours ago I sent you a newsletter after two months of radio silence, and here is another one? Well, when I see an interesting puzzle, I find myself unable to resist trying to solve it. And that's what happened this morning. So here we are - I hope you enjoy it :-)

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

MergingSortedSpliterator

As I rubbed the sleep out of my eyes this morning, I vaguely made out the shape of a tweet with my name on it. Antoine DESSAIGNE (@adessaigne) had tweeted "Hello @java folks 👋, If you have a List<Stream<T>>, each stream with sorted elements, how can you generate a sorted Stream<T>, taking one at a time from each stream? Managed to do that using iterators but is there a clever way to do it with Stream API? Thanks 👍"

None of the Java greats had proposed an easy or performant solution, so I grabbed my laptop and started typing. Antoine had apparently created his own solution using Iterators, and I had in mind a solution using Spliterators. A Spliterator can easily be converted to a Stream using StreamSupport.stream(). Having not seen Antoine's solution, I imagine that he did something along the same lines. Obviously we could just do:

list.stream()
  .flatMap(Function.identity())
  .sorted();

In that case we would need to sort twice. I wanted to avoid that. However, the second sort would not be as expensive as the first. Timsort is efficient at sorting partially sorted lists. In addition, if we also parallelize the second stream sorting, then internally it will do a divide and conquer. There is a good chance that we would end up with chunks of items that are already mostly sorted.

But more fun is to write our own MergingSortedSpliterator. Some constraints are: the streams that are passed in have to have the sorted characteristic, which also implies that their Spliterator should return the Comparator used for sorting. All the streams must have used the same Comparator. The Comparator does not have to be the same instance, but they must match equals(). Another constraint is that this Spliterator does not support parallel streams. The characteristics are an AND of all the contained spliterators and the estimatedSize is a sum of the spliterators, using Long.MAX_VALUE if we overflow.

Note: We updated the class a bit by not reading the first element in the constructor. We also changed the spliterator to never be distinct, otherwise distinct() on the stream becomes a no-op. (suggestions by @tagir_valeev)

import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class MergingSortedSpliterator<T> implements Spliterator<T> {
  private final List<Spliterator<T>> spliterators;
  private final List<Iterator<T>> iterators;
  private final int characteristics;
  private final Object[] nextItem;
  private final static Object START_OF_STREAM = new Object();
  private final static Object END_OF_STREAM = new Object();
  private final Comparator<? super T> comparator;

  public MergingSortedSpliterator(Collection<Stream<T>> streams) {
    this.spliterators = streams.stream()
        .map(Stream::spliterator)
        .collect(Collectors.toList());
    if (!spliterators.stream().allMatch(
        spliterator -> spliterator.hasCharacteristics(SORTED)))
      throw new IllegalArgumentException("Streams must be sorted");
    Comparator<? super T> comparator = spliterators.stream()
        .map(Spliterator::getComparator)
        .reduce(null, (a, b) -> {
          if (Objects.equals(a, b)) return a;
          else throw new IllegalArgumentException(
              "Mismatching comparators " + a + " and " + b);
        });
    this.comparator = Objects.requireNonNullElse(comparator,
        (Comparator<? super T>) Comparator.naturalOrder());
    this.characteristics = spliterators.stream()
        .mapToInt(Spliterator::characteristics)
        .reduce((ch1, ch2) -> ch1 & ch2)
        .orElse(0) & ~DISTINCT; // Mask out DISTINCT

    // setting up iterators
    this.iterators = spliterators.stream()
        .map(Spliterators::iterator)
        .collect(Collectors.toList());
    nextItem = new Object[streams.size()];
    Arrays.fill(nextItem, START_OF_STREAM);
  }

  private Object fetchNext(Iterator<T> iterator) {
    return iterator.hasNext() ? iterator.next() : END_OF_STREAM;
  }

  public boolean tryAdvance(Consumer<? super T> action) {
    Objects.requireNonNull(action, "action==null");
    if (nextItem.length == 0) return false;
    T smallest = null;
    int smallestIndex = -1;
    for (int i = 0; i < nextItem.length; i++) {
      Object o = nextItem[i];
      if (o == START_OF_STREAM)
        nextItem[i] = o = fetchNext(iterators.get(i));
      if (o != END_OF_STREAM) {
        T t = (T) o;
        if (smallest == null ||
            comparator.compare(t, smallest) < 0) {
          smallest = t;
          smallestIndex = i;
        }
      }
    }

    // smallest might be null if the stream contains nulls
    if (smallestIndex == -1) return false;

    nextItem[smallestIndex] =
        fetchNext(iterators.get(smallestIndex));

    action.accept(smallest);
    return true;
  }

  public Spliterator<T> trySplit() {
    // never split - parallel not supported
    return null;
  }

  public long estimateSize() {
    return spliterators.stream()
        .mapToLong(Spliterator::estimateSize)
        .reduce((ch1, ch2) -> {
          long result;
          if ((result = ch1 + ch2) < 0) result = Long.MAX_VALUE;
          return result;
        })
        .orElse(0);
  }

  public int characteristics() {
    return characteristics;
  }

  public Comparator<? super T> getComparator() {
    return comparator;
  }
}
  

We can then create a Stream from the Spliterator using StreamSupport.stream(new MergingSortedSpliterator<>(streams), false). The false means that the stream will be sequential and not parallel. As mentioned above, parallel is not supported.

Here is a demo of the MergingSortedSpliterator at work:

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class SortedStreamOfSortedStreams {
  private static final int SIZE = 5;

  public static void main(String... args) {
    List<Stream<Integer>> streams = List.of(
        generateSortedRandom(SIZE),
        generateSortedRandom(SIZE),
        generateSortedRandom(SIZE),
        generateSortedRandom(SIZE)
    );

    Stream<Integer> numbers = StreamSupport.stream(
        new MergingSortedSpliterator<>(streams), false
    );
    numbers.forEach(System.out::println);
  }

  private static Stream<Integer> generateSortedRandom(int size) {
    return ThreadLocalRandom.current().ints(size, 0, size * 4)
        .parallel()
        .sorted()
        .boxed();
  }
}
  

For example, we might see output like this:

0
0
2
4
4
5
6
6
7
10
10
11
12
15
16
17
18
18
19
19
  

I ran some performance tests and as I expected, my MergingSortedSpliterator is faster than a sorted flatMap. However, even faster, at least on my machine, is a parallel sorted flatMap. Here is a basic performance test:

import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class PerformanceTest {
  private static final int SIZE = 10_000_000;
  private static final
  List<Function<List<Stream<Integer>>, Stream<Integer>>> MERGERS =
      List.of(
          s -> s.stream()
              .flatMap(Function.identity())
              .sorted(),
          s -> s.stream()
              .flatMap(Function.identity())
              .parallel()
              .sorted(),
          s -> StreamSupport.stream(
              new MergingSortedSpliterator<>(s), false
          ));

  public static void main(String... args) {
    for (int i = 0; i < 10; i++) {
      test();
      System.out.println();
    }
  }

  private static void test() {
    MERGERS.forEach(merger -> {
      List<Stream<Integer>> streams = makeStreams();
      long time = System.nanoTime();
      try {
        Stream<Integer> numbers = merger.apply(streams);
        numbers.forEach(i -> { });
      } finally {
        time = System.nanoTime() - time;
        System.out.printf("time = %dms%n", (time / 1_000_000));
      }
    });
  }

  private static List<Stream<Integer>> makeStreams() {
    return Stream.generate(() -> generateSorted(SIZE))
        .limit(10).collect(Collectors.toList());
  }

  private static Stream<Integer> generateSorted(int size) {
    return IntStream.range(0, size).boxed();
  }
}
  

On my 1-8-2 MacBook Pro the best results were:

Sorted flatMap sequential: 7.8 seconds, 3.1 GB allocated
Sorted flatMap parallel:   2.1 seconds, 3.0 GB allocated
MergingSortedSpliterator:  5.2 seconds, 1.5 GB allocated
  

Thus the MergingSortedSpliterator would be faster than the sequential version and require less CPU cycles than the parallel version. Unless I was working with very large datasets, I would probably favour the sorted flatMap sequential version. Only catch is that we should make sure that the comparator is the same as for the original streams. My MergingSortedSpliterator takes care of that for us.

Thanks Antoine for the challenge. I enjoy solving puzzles like this, especially if they have some practical application and are not just interview questions or homework assignments :-)

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack 21

Superpack 21 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...

Java Emergency?

If your system is down, we will review it for 15 minutes and give you our findings for just 1 € without any obligation.