Running on Java 22-ea+15-1134 (Preview)
Home of The JavaSpecialists' Newsletter

229Cleaning ThreadLocals

Author: Dr. Heinz M. KabutzDate: 2015-05-23Java Version: 8Category: Tips and Tricks
 

Abstract: ThreadLocals should in most cases be avoided. They can solve some tough problems, but can introduce some nasty memory leaks, especially if the ThreadLocal class or value refer to our own classes, not a system class. In this newsletter we show some mechanisms that help under OpenJDK.

 

Welcome to the 229th edition of The Java(tm) Specialists' Newsletter, written mostly in Düsseldorf in Germany. Düsseldorf is mentioned in the 2005 rendition of Charlie in the Chocolate Factory. If you saw the movie, you would probably imagine Düsseldorf as this quaint German town in the mountains. Anything but! It is completely flat. Most of the buildings are modern, as 64% of the city was destroyed by 700 tons of bombs in 1942. But the modern Düsseldorf pulses with energy. Lots of young people live here and the bars are very active indeed. We've done many courses here (in German) and I keep coming back. "Füchschen" is a firm favourite.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Cleaning ThreadLocals

Warning: The code in this newsletter contains bugs. When I did the copy of the ThreadLocals via reflection, I forgot to copy the size and the threshold fields. This can cause a livelock, as pointed out by Robert Munteanu in A cautionary tale on thread locals, pooling and reflection. The improved code can be found in links in the article. I chose to keep the code in this newsletter as it was, so that the astute reader can compare the difference.

In most of my courses (master, concurrency, xj-conc), I mention ThreadLocal. It is always accompanied by stern warnings that we should avoid it if possible. ThreadLocal is designed for threads that are used and then discarded. Before the thread exits, all its thread local entries are erased. In fact, if you read Why 0x61c88647?, it will explain how the actual values are stored inside a map that lives within the thread. Threads that live in thread pools generally survive a single user request, thus making them prone for memory leaks. Usually when I talk about these, at least one of my students has a story of how their production system collapsed due to a thread local hanging onto one of their classes, thus preventing the entire class loader from being unloaded. This problem is extremely common.

In this newsletter, I would like to demonstrate a ThreadLocalCleaner class that can restore the thread locals to their former glory when a thread is ready to go back into the pool. At the most basic level, we can save the state of the ThreadLocals for our current thread and then restore them again later. We would typically do this with the try-with-resource construct that was added in Java 7. For example:

try (ThreadLocalCleaner tlc = new ThreadLocalCleaner()) {
  // some code that potentially creates and adds thread locals
}
// at this point, the new thread locals have been cleared

To make it easier to debug our systems, we also add an observer mechanism, so that we can be notified of any changes made to the thread local map before restoring it. This will help us discover possible thread local leaks. Here is our listener:

package threadcleaner;

@FunctionalInterface
public interface ThreadLocalChangeListener {
  void changed(Mode mode, Thread thread,
               ThreadLocal<?> threadLocal, Object value);

  ThreadLocalChangeListener EMPTY = (m, t, tl, v) -> {};

  ThreadLocalChangeListener PRINTER =
      (m, t, tl, v) -> System.out.printf(
          "Thread %s %s ThreadLocal %s with value %s%n",
          t, m, tl.getClass(), v);

  enum Mode {
    ADDED, REMOVED
  }
}

Some explanations might be necessary. First off, I have marked it as a @FunctionalInterface, which in Java 8 means it has exactly one abstract method and can be used for a lambda. Secondly, I have defined an EMPTY lambda internally. As you can see, the code for that is minimal. Thirdly, we also have a PRINTER default, which simply prints the change to System.out. Lastly, we also have two different events, but since I wanted to design a @FunctionalInterface, I had to define the marker as a separate attribute, in this case an enum.

When we construct our ThreadLocalCleaner, we can also pass in a ThreadLocalChangeListener and this will be notified of any changes that might have occurred since twe created out ThreadLocalCleaner. Please note that these mechanisms only are applied to the current thread. Here is an example of how we could use the ThreadLocalCleaner in a try-with-resource block of code: Any local variable defined in the try (...) will be automatically closed at the end of the block of code. We thus have a close() method inside our ThreadLocalCleaner that restores the thread local entries to their previous values.

import java.text.*;

public class ThreadLocalCleanerExample {
  private static final ThreadLocal<DateFormat> df =
      new ThreadLocal<DateFormat>() {
        protected DateFormat initialValue() {
          return new SimpleDateFormat("yyyy-MM-dd");
        }
      };

  public static void main(String... args) {
    System.out.println("First ThreadLocalCleaner context");
    try (ThreadLocalCleaner tlc = new ThreadLocalCleaner(
        ThreadLocalChangeListener.PRINTER)) {
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
    }

    System.out.println("Another ThreadLocalCleaner context");
    try (ThreadLocalCleaner tlc = new ThreadLocalCleaner(
        ThreadLocalChangeListener.PRINTER)) {
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
    }
  }
}

We also have two public static methods in our ThreadLocalCleaner class: forEach() and cleanup(Thread). The forEach() method takes as parameters a thread and a BiConsumer, which is then called with the ThreadLocal and its value for each entry. We skip entries with null keys, but not those with null values. The reason is that a ThreadLocal can still cause a memory leak, even if the value is null. Once we are done with using the ThreadLocal, we should always call the remove() method before letting the thread return to the pool. The cleanup(Thread) method sets the ThreadLocal map in that thread to null, thus allowing all of the entries to be garbage collected. If a ThreadLocal is used again after we have cleared it, the initialValue() method will simply be called to recreate the entry. Here are the method definitions:

public static void forEach(Thread thread,
      BiConsumer<ThreadLocal<?>, Object> consumer) { ... }

public static void cleanup(Thread thread) { ... }

The complete code of the ThreadLocalCleaner class is here. It uses a lot of reflection on private fields. It will probably only work with the OpenJDK or direct derivatives. You will also notice that I am using Java 8 for the syntax. I debated myself for a while whether I should use Java 8 or 7. Some of my clients are still on 1.4. In the end, most of my large banking customers are already using Java 8 in production. Banks are usually not the first adopters, except in the case where it makes a lot of financial sense. Thus if you're not using Java 8 in production yet, you should probably look at migrating soon, or even skipping it and going straight to Java 9. You could easily backport this to Java 7 by writing your own BiConsumer interface. Java 6 does not have the nice try-with-resource construct, so I would not go that far back.

package threadcleaner;

import java.lang.ref.*;
import java.lang.reflect.*;
import java.util.*;
import java.util.function.*;

import static threadcleaner.ThreadLocalChangeListener.Mode.*;

public class ThreadLocalCleaner implements AutoCloseable {
  private final ThreadLocalChangeListener listener;

  public ThreadLocalCleaner() {
    this(ThreadLocalChangeListener.EMPTY);
  }

  public ThreadLocalCleaner(ThreadLocalChangeListener listener) {
    this.listener = listener;
    saveOldThreadLocals();
  }

  public void close() {
    cleanup();
  }

  public void cleanup() {
    diff(threadLocalsField, copyOfThreadLocals.get());
    diff(inheritableThreadLocalsField,
        copyOfInheritableThreadLocals.get());
    restoreOldThreadLocals();
  }

  public static void forEach(
      Thread thread,
      BiConsumer<ThreadLocal<?>, Object> consumer) {
    forEach(thread, threadLocalsField, consumer);
    forEach(thread, inheritableThreadLocalsField, consumer);
  }

  public static void cleanup(Thread thread) {
    try {
      threadLocalsField.set(thread, null);
      inheritableThreadLocalsField.set(thread, null);
    } catch (IllegalAccessException e) {
      throw new IllegalStateException(
          "Could not clear thread locals: " + e);
    }
  }

  private void diff(Field field, Reference<?>[] backup) {
    try {
      Thread thread = Thread.currentThread();
      Object threadLocals = field.get(thread);
      if (threadLocals == null) {
        if (backup != null) {
          for (Reference<?> reference : backup) {
            changed(thread, reference,
                REMOVED);
          }
        }
        return;
      }

      Reference<?>[] current =
          (Reference<?>[]) tableField.get(threadLocals);
      if (backup == null) {
        for (Reference<?> reference : current) {
          changed(thread, reference, ADDED);
        }
      } else {
        // nested loop - both arrays *should* be relatively small
        next:
        for (Reference<?> curRef : current) {
          if (curRef != null) {
            if (curRef.get() == copyOfThreadLocals ||
                curRef.get() == copyOfInheritableThreadLocals) {
              continue next;
            }
            for (Reference<?> backupRef : backup) {
              if (curRef == backupRef) continue next;
            }
            // could not find it in backup - added
            changed(thread, curRef, ADDED);
          }
        }
        next:
        for (Reference<?> backupRef : backup) {
          for (Reference<?> curRef : current) {
            if (curRef == backupRef) continue next;
          }
          // could not find it in current - removed
          changed(thread, backupRef,
              REMOVED);
        }
      }
    } catch (IllegalAccessException e) {
      throw new IllegalStateException("Access denied", e);
    }
  }

  private void changed(Thread thread, Reference<?> reference,
                       ThreadLocalChangeListener.Mode mode)
      throws IllegalAccessException {
    listener.changed(mode,
        thread, (ThreadLocal<?>) reference.get(),
        threadLocalEntryValueField.get(reference));
  }

  private static Field field(Class<?> c, String name)
      throws NoSuchFieldException {
    Field field = c.getDeclaredField(name);
    field.setAccessible(true);
    return field;
  }

  private static Class<?> inner(Class<?> clazz, String name) {
    for (Class<?> c : clazz.getDeclaredClasses()) {
      if (c.getSimpleName().equals(name)) {
        return c;
      }
    }
    throw new IllegalStateException(
        "Could not find inner class " + name + " in " + clazz);
  }

  private static void forEach(
      Thread thread, Field field,
      BiConsumer<ThreadLocal<?>, Object> consumer) {
    try {
      Object threadLocals = field.get(thread);
      if (threadLocals != null) {
        Reference<?>[] table = (Reference<?>[])
            tableField.get(threadLocals);
        for (Reference<?> ref : table) {
          if (ref != null) {
            ThreadLocal<?> key = (ThreadLocal<?>) ref.get();
            if (key != null) {
              Object value = threadLocalEntryValueField.get(ref);
              consumer.accept(key, value);
            }
          }
        }
      }
    } catch (IllegalAccessException e) {
      throw new IllegalStateException(e);
    }
  }

  private static final ThreadLocal<Reference<?>[]>
      copyOfThreadLocals = new ThreadLocal<>();

  private static final ThreadLocal<Reference<?>[]>
      copyOfInheritableThreadLocals = new ThreadLocal<>();

  private static void saveOldThreadLocals() {
    copyOfThreadLocals.set(copy(threadLocalsField));
    copyOfInheritableThreadLocals.set(
        copy(inheritableThreadLocalsField));
  }

  private static Reference<?>[] copy(Field field) {
    try {
      Thread thread = Thread.currentThread();
      Object threadLocals = field.get(thread);
      if (threadLocals == null) return null;
      Reference<?>[] table =
          (Reference<?>[]) tableField.get(threadLocals);
      return Arrays.copyOf(table, table.length);
    } catch (IllegalAccessException e) {
      throw new IllegalStateException("Access denied", e);
    }
  }

  private static void restoreOldThreadLocals() {
    try {
      restore(inheritableThreadLocalsField,
          copyOfInheritableThreadLocals.get());
      restore(threadLocalsField, copyOfThreadLocals.get());
    } finally {
      copyOfThreadLocals.remove();
      copyOfInheritableThreadLocals.remove();
    }
  }

  private static void restore(Field field, Object value) {
    try {
      Thread thread = Thread.currentThread();
      if (value == null) {
        field.set(thread, null);
      } else {
        tableField.set(field.get(thread), value);
      }
    } catch (IllegalAccessException e) {
      throw new IllegalStateException("Access denied", e);
    }
  }

  /* Reflection fields */

  private static final Field threadLocalsField;

  private static final Field inheritableThreadLocalsField;
  private static final Class<?> threadLocalMapClass;
  private static final Field tableField;
  private static final Class<?> threadLocalMapEntryClass;

  private static final Field threadLocalEntryValueField;

  static {
    try {
      threadLocalsField = field(Thread.class, "threadLocals");
      inheritableThreadLocalsField =
          field(Thread.class, "inheritableThreadLocals");

      threadLocalMapClass =
          inner(ThreadLocal.class, "ThreadLocalMap");

      tableField = field(threadLocalMapClass, "table");
      threadLocalMapEntryClass =
          inner(threadLocalMapClass, "Entry");

      threadLocalEntryValueField =
          field(threadLocalMapEntryClass, "value");
    } catch (NoSuchFieldException e) {
      throw new IllegalStateException(
          "Could not locate threadLocals field in Thread.  " +
              "Will not be able to clear thread locals: " + e);
    }
  }
}

Here is an example of the ThreadLocalCleaner in action:

import java.text.*;

public class ThreadLocalCleanerExample {
  private static final ThreadLocal<DateFormat> df =
      new ThreadLocal<DateFormat>() {
        protected DateFormat initialValue() {
          return new SimpleDateFormat("yyyy-MM-dd");
        }
      };

  public static void main(String... args) {
    System.out.println("First ThreadLocalCleaner context");
    try (ThreadLocalCleaner tlc = new ThreadLocalCleaner(
        ThreadLocalChangeListener.PRINTER)) {
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
    }

    System.out.println("Another ThreadLocalCleaner context");
    try (ThreadLocalCleaner tlc = new ThreadLocalCleaner(
        ThreadLocalChangeListener.PRINTER)) {
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
      System.out.println(System.identityHashCode(df.get()));
    }
  }
}

Your output could potentially contain different system hash codes, though remember please from my Identity Crisis Newsletter that the algorithm for these numbers is just a random number generator. Here is my output. Notice how whilst we are inside the try-with-resource body, the thread local value was the same:

First ThreadLocalCleaner context
186370029
186370029
186370029
Thread Thread[main,5,main] ADDED ThreadLocal class \
    ThreadLocalCleanerExample$1 with value \
    java.text.SimpleDateFormat@f67a0200
Another ThreadLocalCleaner context
2094548358
2094548358
2094548358
Thread Thread[main,5,main] ADDED ThreadLocal class \
    ThreadLocalCleanerExample$1 with value \
    java.text.SimpleDateFormat@f67a0200

In order to make the code a bit easier to use, I have written a Facade. The Facade Design Pattern is not meant to stop users from using the subsystem directly, but are supposed to provider a simpler interface to a complicated system. Typically you provide the most common use cases of the subsystem as methods. Our Facade contains two methods: findAll(Thread) and printThreadLocals(). The findAll() method returns a Collection of entries found inside that thread.

package threadcleaner;

import java.io.*;
import java.lang.ref.*;
import java.util.AbstractMap.*;
import java.util.*;
import java.util.Map.*;
import java.util.function.*;

import static threadcleaner.ThreadLocalCleaner.*;

public class ThreadLocalCleaners {
  public static Collection<Entry<ThreadLocal<?>, Object>> findAll(
      Thread thread) {
    Collection<Entry<ThreadLocal<?>, Object>> result =
        new ArrayList<>();
    BiConsumer<ThreadLocal<?>, Object> adder =
        (key, value) ->
            result.add(new SimpleImmutableEntry<>(key, value));
    forEach(thread, adder);
    return result;
  }

  public static void printThreadLocals() {
    printThreadLocals(System.out);
  }

  public static void printThreadLocals(Thread thread) {
    printThreadLocals(thread, System.out);
  }

  public static void printThreadLocals(PrintStream out) {
    printThreadLocals(Thread.currentThread(), out);
  }

  public static void printThreadLocals(Thread thread,
                                       PrintStream out) {
    out.println("Thread " + thread.getName());
    out.println("  ThreadLocals");
    printTable(thread, out);
  }

  private static void printTable(
      Thread thread, PrintStream out) {
    forEach(thread, (key, value) -> {
      out.printf("    {%s,%s", key, value);
      if (value instanceof Reference) {
        out.print("->" + ((Reference<?>) value).get());
      }
      out.println("}");
    });
  }
}

Thread can contain two different types of ThreadLocal: the normal one and inheritable. In almost all cases, we use the normal one. Inheritable means that if you construct a new thread from your current thread, then all the inheritable ThreadLocals in the creating thread are inherited by the new thread. Yeah. I agree with you. Use cases are rather scarce. So just forget about this for now. Or forever :-)

An obvious use case of this ThreadLocalCleaner is with the ThreadPoolExecutor. We subclass it and override the methods beforeExecute() and afterExecute(). The class is long because we have to code all the constructors. The interesting bit is right at the end.

package threadcleaner;

import java.util.concurrent.*;

public class ThreadPoolExecutorExt extends ThreadPoolExecutor {
  private final ThreadLocalChangeListener listener;

  /* Bunch of constructors following - you can ignore those */

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, ThreadLocalChangeListener.EMPTY);
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, threadFactory,
        ThreadLocalChangeListener.EMPTY);
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, handler,
        ThreadLocalChangeListener.EMPTY);
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, threadFactory, handler,
        ThreadLocalChangeListener.EMPTY);
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      ThreadLocalChangeListener listener) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue);
    this.listener = listener;
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      ThreadLocalChangeListener listener) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, threadFactory);
    this.listener = listener;
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      RejectedExecutionHandler handler,
      ThreadLocalChangeListener listener) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, handler);
    this.listener = listener;
  }

  public ThreadPoolExecutorExt(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler,
      ThreadLocalChangeListener listener) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
        workQueue, threadFactory, handler);
    this.listener = listener;
  }

  /* The interest bit of this class is below ... */

  private static final ThreadLocal<ThreadLocalCleaner> local =
      new ThreadLocal<>();

  protected void beforeExecute(Thread t, Runnable r) {
    assert t == Thread.currentThread();
    local.set(new ThreadLocalCleaner(listener));
  }

  protected void afterExecute(Runnable r, Throwable t) {
    ThreadLocalCleaner cleaner = local.get();
    local.remove();
    cleaner.cleanup();
  }
}

You would use this just like a normal ThreadPoolExecutor, with the difference that this one would restore the state of the thread locals after each Runnable has been executed. You can also again attach a listener in case you need to debug your system. In our example here, you can see that we have attached a listener to dump the added thread locals to our LOG. Notice also that in Java 8, the java.util.logging.Logger methods now take a Supplier<String>, meaning that we do not need code guards anymore to make our logging performant.

import java.text.*;
import java.util.concurrent.*;
import java.util.logging.*;

public class ThreadPoolExecutorExtTest {
  private static final Logger LOG = Logger.getLogger(
      ThreadPoolExecutorExtTest.class.getName()
  );

  private static final ThreadLocal<DateFormat> df =
      new ThreadLocal<DateFormat>() {
        protected DateFormat initialValue() {
          return new SimpleDateFormat("yyyy-MM-dd");
        }
      };

  public static void main(String... args)
      throws InterruptedException {
    ThreadPoolExecutor tpe = new ThreadPoolExecutorExt(
        1, 1, 0, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(),
        (m, t, tl, v) -> {
          LOG.warning(
              () -> String.format(
                  "Thread %s %s ThreadLocal %s with value %s%n",
                  t, m, tl.getClass(), v)
          );
        }
    );

    for (int i = 0; i < 10; i++) {
      tpe.submit(() ->
          System.out.println(System.identityHashCode(df.get())));
      Thread.sleep(1000);
    }
    tpe.shutdown();
  }
}

Output on my machine is something like this:

914524658
May 23, 2015 9:28:50 PM ThreadPoolExecutorExtTest lambda$main$1
WARNING: Thread Thread[pool-1-thread-1,5,main] \
    ADDED ThreadLocal class ThreadPoolExecutorExtTest$1 \
    with value java.text.SimpleDateFormat@f67a0200

957671209
May 23, 2015 9:28:51 PM ThreadPoolExecutorExtTest lambda$main$1
WARNING: Thread Thread[pool-1-thread-1,5,main] \
    ADDED ThreadLocal class ThreadPoolExecutorExtTest$1 \
    with value java.text.SimpleDateFormat@f67a0200

466968587
May 23, 2015 9:28:52 PM ThreadPoolExecutorExtTest lambda$main$1
WARNING: Thread Thread[pool-1-thread-1,5,main] \
    ADDED ThreadLocal class ThreadPoolExecutorExtTest$1 \
    with value java.text.SimpleDateFormat@f67a0200

That's it for now. This code has not been tried in the crucible of a production server, so please use it with caution. Thank you for reading this newsletter and also for your support. I really appreciate it!

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...