Running on Java 22-ea+27-2262 (Preview)
Home of The JavaSpecialists' Newsletter

297Measuring ForkJoinPool Parallelism

Author: Dr Heinz M. KabutzDate: 2022-01-31Java Version: 17Category: Performance
 

Abstract: Java has support for parallelism baked into the JDK. We have parallel streams, parallel sort and CompletableFutures, all using the same common ForkJoinPool. In this newsletter we explore how to measure the effectiveness of our parallelism.

 

Welcome to the 297th edition of The Java(tm) Specialists' Newsletter. Last week the country shut down for two days due to an expected snowfall. It was chaotic and my poor mother-in-law was stuck in Athens Airport for over 34 hours. Not fun. But the sun was shining again today, which meant we got our walk and run on the beach, followed by my daily push-up routine and a splash in the sea. My target for January was 2000 push-ups, and fortunately I even exceeded that by a bit. February will be tougher with a target of 3000.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Measuring ForkJoinPool Parallelism

One of my side projects is trying to add a parallelMultiply() method to BigInteger. It makes sense once the numbers become large enough. The Toom Cook 3 algorithm is fairly easy to parallelize. Unfortunately we cannot achieve perfect parallelism, because a major bottleneck is the memory access. One of the reviewers asked how efficient the parallel algorithm is versus the sequential. The latency is certainly less, but what about the CPU consumption? Does this increase or stay the same?

This is not that easy to figure out. As far as I could tell, the Java Microbenchmarking Harness (JMH) does not give us easy access to these numbers. I had written some code in the past to tickle this information from the ForkJoinPool by writing a custom thread factory. It worked well enough, but at the time I didn't need it that desperately and thus stopped developing it.

The trick is to create a special ForkJoinWorkerThread factory by implementing the ForkJoinPool.ForkJoinWorkerThreadFactory. This is then set with the VM parameter -Djava.util.concurrent.ForkJoinPool.common.parallelism=OurFactoryClassName. Things went wonky when I hit a rather gnarly class loader bug (JDK-8280772) that seems to have crept into Java 12.

To make things easier, I set the property inside my ForkJoinPoolBench, hopefully before the common ForkJoinPool is used for the first time. I used the ideas from my ByteWatcher to also measure the amount of bytes allocated.

Before I show the ForkJoinPoolBench code, I'd like to demonstrate its usefulness. We can easily measure that sorting an int array in parallel is faster on a multi-core machine than sorting it sequentially. But which uses more CPU time? Let's try it out:

import eu.javaspecialists.tjsn.bench.*;

import java.util.*;
import java.util.concurrent.*;

public class ArraySortBenchTest {
  public static void main(String... args) {
    int[] array = ThreadLocalRandom.current()
        .ints(100_000_000).toArray();

    for (int i = 0; i < 3; i++) {
      int[] sequentialToSort = array.clone();
      ForkJoinPoolBench.test(
          () -> Arrays.sort(sequentialToSort),
          new DefaultStatsListener("sequentialSort" + i));
    }

    for (int i = 0; i < 3; i++) {
      int[] parallelToSort = array.clone();
      ForkJoinPoolBench.test(
          () -> Arrays.parallelSort(parallelToSort),
          new DefaultStatsListener("parallelSort" + i));
    }
  }
}

Running this on a 1-6-2 server with Java 17, we get the following results:

Results for sequentialSort2
real  0m6.841s
user  0m6.840s
sys   0m0.000s
mem   344.0B
Results for parallelSort2
real  0m0.863s
user  0m9.200s
sys   0m0.010s
mem   387.0MB

We can thus say that the parallel sort was 7.9x faster in real time, but used 1.35x as much CPU time in total. The parallel sort also allocated 448 MB/s. However, we must treat these figures with caution. We only have 6 cores on that particular server, so how can it be almost 8x faster with real time? It is possible that the two sorting algorithms are slightly different.

Here is another example, calculating 200k! using first a sequential stream and then a parallel stream. The parallel is faster, wildly so, but not for the reasons we might imagine. The parallelism is incidental to the performance improvement.

import eu.javaspecialists.tjsn.bench.*;

import java.math.*;
import java.util.stream.*;

public class FactorialByStreamDemo {
  public static void main(String... args) {
    ForkJoinPoolBench.test(
        () -> IntStream.rangeClosed(1, 200_000)
            .mapToObj(BigInteger::valueOf)
            .reduce(BigInteger.ONE, BigInteger::multiply),
        new DefaultStatsListener("sequentialFactorial"));

    ForkJoinPoolBench.test(
        () -> IntStream.rangeClosed(1, 200_000)
            .parallel()
            .mapToObj(BigInteger::valueOf)
            .reduce(BigInteger.ONE, BigInteger::multiply),
        new DefaultStatsListener("parallelFactorial"));
  }
}

Here is the output from the run:

Results for sequentialFactorial
real  0m9.838s
user  0m9.670s
sys   0m0.106s
mem   52.9GB
Results for parallelFactorial
real  0m0.281s
user  0m1.760s
sys   0m0.056s
mem   1.5GB

On a 1-6-2 machine, the parallel is 35x faster and uses 35x less memory. What gives?

Let's rewrite the algorithm using CompletableFuture, either as serial or as parallel using thenCombineAsync().

import eu.javaspecialists.tjsn.bench.*;

import java.math.*;
import java.util.concurrent.*;
import java.util.function.*;

public class FactorialByCompletableFutureDemo {
  private static final
  BinaryOperator<CompletableFuture<BigInteger>> SERIAL =
      (a, b) -> a.thenCombine(b, BigInteger::multiply);
  private static final
  BinaryOperator<CompletableFuture<BigInteger>> PARALLEL =
      (a, b) -> a.thenCombineAsync(b, BigInteger::multiply);

  private static BigInteger factorial(
      int n, BinaryOperator<CompletableFuture<BigInteger>> op) {
    return factorial(0, n, op).join();
  }

  private static CompletableFuture<BigInteger> factorial(
      int from, int to,
      BinaryOperator<CompletableFuture<BigInteger>> op) {
    if (from == to) {
      BigInteger result = from == 0 ? BigInteger.ONE :
          BigInteger.valueOf(from);
      return CompletableFuture.completedFuture(result);
    }
    int mid = (from + to) >>> 1;
    return op.apply(factorial(from, mid, op),
        factorial(mid + 1, to, op));
  }

  public static void main(String... args) {
    ForkJoinPoolBench.test(
        () -> factorial(2_000_000, SERIAL),
        new DefaultStatsListener("sequentialFactorial"));

    ForkJoinPoolBench.test(
        () -> factorial(2_000_000, PARALLEL),
        new DefaultStatsListener("parallelFactorial"));
  }
}

Note that we increased the factorial to 2 million. Here are the performance results:

Results for sequentialFactorial
real  0m8.396s
user  0m8.190s
sys   0m0.131s
mem   27.3GB
Results for parallelFactorial
real  0m5.461s
user  0m13.260s
sys   0m1.706s
mem   27.4GB

We see that the memory usage was roughly the same for sequential and parallel. The parallel was only 1.54x faster in real time, and used 1.62x as much user time. System time was also substantial for the parallel version. Memory is being allocated at a rate of 5 GB/s.

Why is the performance not better? Well, the way that the algorithm works is that we are multiplying larger and larger numbers together. The final three muliplications take a big chunk of time. For the last two seconds of the run, we are doing the final multiplication. This is what my parallelMultiply() method improves (hopefully Java 19, but may end up in 20 or 21). Here are the results if we run this with my parallelMultiply() method:

Results for parallelFactorial
real  0m2.656s
user  0m18.160s
sys   0m3.271s
mem   27.4GB

The real time is now 3.16x faster than the sequential version and we are using 2.58x more cpu time. At 10 GB/s, we are probably close to the memory bandwidth of my server.

The ForkJoinPool may occasionally create additional threads, enabled by the ManagedBlocker. One example of a class that has from the outset incorporated the ManagedBlocker is the Phaser. Here is an example of the ForkJoinPoolBench using the Phaser:

import eu.javaspecialists.tjsn.bench.*;

import java.util.concurrent.*;
import java.util.stream.*;

public class PhaserBenchTest {
  public static void main(String... args) {
    System.out.println("Phaser");
    ForkJoinPoolBench.test(() -> {
      int upto = 4 * Runtime.getRuntime().availableProcessors();
      var phaser = new Phaser(upto);
      IntStream.range(0, upto)
          .parallel()
          .forEach(ignored -> phaser.arriveAndAwaitAdvance());
      System.out.println("done");
    }, (realTime, userTimeStats, cpuTimeStats, allocationStats)
        -> System.out.println(
        "realTime = " + realTime + ",\n" +
            " userTimeStats = " + userTimeStats + ",\n" +
            " cpuTimeStats = " + cpuTimeStats + ",\n" +
            " allocationStats = " + allocationStats));
  }
}

When we look at the output, we see that we now have 48 data points, thus our 12 hardware threads x 4.

Phaser
done
realTime = 23203070,
 userTimeStats = LongSummaryStatistics{count=48, sum=10000047,
    min=1, average=208334.312500, max=10000000},
 cpuTimeStats = LongSummaryStatistics{count=48, sum=37473564,
    min=282691, average=780699.250000, max=12217508},
 allocationStats = LongSummaryStatistics{count=48, sum=251047,
    min=1041, average=5230.145833, max=74336}

Without much more fanfare, here is my ForkJoinPoolBench. It works well enough, but is not suitable to be run in production. Please let me know if you find this useful.

/*
 * (C)opyright 2022, Heinz Kabutz, All rights reserved
 */
package eu.javaspecialists.tjsn.bench;

import javax.management.*;
import java.lang.management.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
 * @author Dr Heinz M. Kabutz heinz@javaspecialists.eu
 */
public final class ForkJoinPoolBench {
  /**
   * The key to this bench is the ThreadFactory, which measures
   * the cpu time, user time and memory allocation of each thread
   * that is created in the common ForkJoinPool. The number of
   * threads might increase temporarily because of the
   * ManagedBlocker, and we never remove unused threads again.
   */
  public static class ThreadFactory
      implements ForkJoinPool.ForkJoinWorkerThreadFactory {
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
      var thread =
          ForkJoinPool.defaultForkJoinWorkerThreadFactory
              .newThread(pool);
      bench.addCounters(thread);
      return thread;
    }
  }

  /**
   * We run the task in our bench, measuring user time, cpu time,
   * real time and bytes allocated.
   */
  public static void test(Runnable task, StatsListener listener) {
    bench.test0(task, listener);
  }

  /**
   * Class to simplify the ConcurrentHashMap usage.
   */
  private static class MeasureMap extends
      ConcurrentHashMap<Thread, AtomicLong> {}

  private final MeasureMap userTime = new MeasureMap();
  private final MeasureMap cpuTime = new MeasureMap();
  private final MeasureMap allocation = new MeasureMap();

  /**
   * Interface to simplify the ToLongFunction code.
   */
  @FunctionalInterface
  private interface ExtractorFunction {
    long extract(Thread thread);
  }

  private ForkJoinPoolBench() {}

  /**
   * Only one thread at a time can run the test.
   */
  private static final Object TEST_MONITOR = new Object();

  private void test0(Runnable task, StatsListener listener) {
    LongSummaryStatistics userStats, cpuStats, memStats;
    long realTime;
    synchronized (TEST_MONITOR) {
      addCounters(Thread.currentThread());
      try {
        resetAllCounters();
        realTime = System.nanoTime();
        try {
          task.run();
        } finally {
          memStats = getStats(allocation, MEM_FUNCTION);
          realTime = System.nanoTime() - realTime;
          userStats = getStats(userTime, USER_TIME_FUNCTION);
          cpuStats = getStats(cpuTime, CPU_TIME_FUNCTION);
        }
      } finally {
        removeCounters(Thread.currentThread());
      }
    }
    listener.result(realTime, userStats, cpuStats, memStats);
  }

  private LongSummaryStatistics getStats(
      MeasureMap map,
      ExtractorFunction extractorFunction) {
    map.forEach((key, value) -> {
      long after = extractorFunction.extract(key);
      long before = value.get();
      value.set(after - before);
    });
    return map.values()
        .stream()
        .mapToLong(AtomicLong::get)
        .summaryStatistics();
  }

  private void addCounters(Thread thread) {
    add(userTime, thread, USER_TIME_FUNCTION);
    add(cpuTime, thread, CPU_TIME_FUNCTION);
    add(allocation, thread, MEM_FUNCTION);
  }

  private void removeCounters(Thread thread) {
    userTime.remove(thread);
    cpuTime.remove(thread);
    allocation.remove(thread);
  }

  private void add(MeasureMap map,
                   Thread thread,
                   ExtractorFunction extractor) {
    map.put(thread, new AtomicLong(extractor.extract(thread)));
  }

  private void resetAllCounters() {
    resetCounter(userTime, USER_TIME_FUNCTION);
    resetCounter(cpuTime, CPU_TIME_FUNCTION);
    resetCounter(allocation, MEM_FUNCTION);
  }

  private void resetCounter(MeasureMap map,
                            ExtractorFunction extractor) {
    map.forEach((thread, value) ->
        value.set(extractor.extract(thread)));
  }

  private static long threadAllocatedBytes(Thread thread) {
    try {
      return (long) mBeanServer.invoke(name,
          "getThreadAllocatedBytes",
          new Object[]{thread.getId()},
          SIGNATURE);
    } catch (JMException e) {
      throw new IllegalArgumentException(e);
    }
  }

  private static final ThreadMXBean tmb =
      ManagementFactory.getThreadMXBean();
  private static final ExtractorFunction USER_TIME_FUNCTION =
      thread -> tmb.getThreadUserTime(thread.getId());
  private static final ExtractorFunction CPU_TIME_FUNCTION =
      thread -> tmb.getThreadCpuTime(thread.getId());
  private static final ExtractorFunction MEM_FUNCTION =
      ForkJoinPoolBench::threadAllocatedBytes;
  private static final String[] SIGNATURE =
      {long.class.getName()};
  private static final MBeanServer mBeanServer =
      ManagementFactory.getPlatformMBeanServer();
  private static final ObjectName name;
  private static final ForkJoinPoolBench bench;

  static {
    System.setProperty("java.util.concurrent.ForkJoinPool" +
        ".common.threadFactory", ThreadFactory.class.getName());
    if (!(ForkJoinPool.commonPool()
        .getFactory() instanceof ThreadFactory))
      throw new IllegalStateException(
          "Common pool thread factory should be a " +
              ThreadFactory.class.getName());
    try {
      name = new ObjectName(ManagementFactory.THREAD_MXBEAN_NAME);
    } catch (MalformedObjectNameException e) {
      throw new ExceptionInInitializerError(e);
    }
    bench = new ForkJoinPoolBench();
  }

  public interface StatsListener {
    void result(long realTime,
                LongSummaryStatistics userTimeStats,
                LongSummaryStatistics cpuTimeStats,
                LongSummaryStatistics allocationStats);
  }
}

And a DefaultStatsListener that prints the information out the way that I showed above in my output:

package eu.javaspecialists.tjsn.bench;

import java.util.*;
import java.util.concurrent.*;

public class DefaultStatsListener implements
    ForkJoinPoolBench.StatsListener {
  private final String description;

  public DefaultStatsListener(String description) {
    this.description = description;
  }

  public void result(long realTime,
                     LongSummaryStatistics userTimeStats,
                     LongSummaryStatistics cpuTimeStats,
                     LongSummaryStatistics allocationStats) {
    long userTime = userTimeStats.getSum();
    long sysTime = cpuTimeStats.getSum() - userTime;
    long bytes = allocationStats.getSum();

    System.out.println("Results for " + description);
    System.out.println("real  " + formatTime(realTime));
    System.out.println("user  " + formatTime(userTime));
    System.out.println("sys   " + formatTime(sysTime));
    System.out.println("mem   " + formatMemory(bytes));
  }

  private static String formatMemory(double bytes) {
    double val;
    String unitStr;
    if (bytes < 1024) {
      val = bytes;
      unitStr = "B";
    } else if (bytes < 1024 * 1024) {
      val = bytes / 1024;
      unitStr = "KB";
    } else if (bytes < 1024 * 1024 * 1024) {
      val = bytes / (1024 * 1024);
      unitStr = "MB";
    } else if (bytes < 1024 * 1024 * 1024 * 1024L) {
      val = bytes / (1024 * 1024 * 1024L);
      unitStr = "GB";
    } else {
      val = bytes / (1024 * 1024 * 1024 * 1024L);
      unitStr = "TB";
    }
    return String.format(Locale.US, "%.1f%s", val, unitStr);
  }

  private static String formatTime(long nanos) {
    if (nanos < 0) nanos = 0;
    long timeInMs = TimeUnit.NANOSECONDS.toMillis(nanos);
    long minutes = timeInMs / 60_000;
    double remainingMs = (timeInMs % 60_000) / 1000.0;
    return String.format(Locale.US, "%dm%.3fs", minutes, remainingMs);
  }
}

I hope you enjoyed this and found it interesting, perhaps even useful. Many greetings from Crete!

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...