Java Specialists' Java Training Europehome of the java specialists' newsletter

The Java Specialists' Newsletter
Issue 1782009-11-14 Category: Tips and Tricks Java version: 5+

GitHub Subscribe Free RSS Feed

WalkingCollection

by Dr. Heinz M. Kabutz
Abstract:
We look at how we could internalize the iteration into a collection by introducing a Processor interface that is applied to each element. This allows us to manage concurrency from within the collection.

Welcome to the 178th issue of The Java(tm) Specialists' Newsletter, sent to you from Chania on the beautiful island of Crete. A few days ago, we spontaneously decided to make a spit roast with some friends from Denver. When we bought our house, it came with a fire pit and hooks for a spit, so recently I invested in a motor for turning the meat around. So here we were, sitting outside in our shorts at ten in the night in Europe in November, watching the fat drip onto the coals. The spit roast must be one of the easiest way of cooking the meat, you simply put it on and then sit there watching it get ready, whilst chatting about politics and philosophy. Now why did I not buy this equipment when I still lived in South Africa? Do let me know if you come to Chania in Crete and we'll have a feast together, assuming that I am home ... :-)

I will be in Montreal from the 22-26 Nov 2009 and San Jose from the 6-11 Dec 2009. By the time I get home from those trips, I will have done 93 flights in this year! Let me know please if you are interested in attending our Java Specialist Master Course in San Jose in December 6-11. The course is gaining momentum and we expect it to sell out. We do not know when we will be able to run it again in San Jose.

NEW: Please see our new "Extreme Java" course, combining concurrency, a little bit of performance and Java 8. Extreme Java - Concurrency & Performance for Java 8.

WalkingCollection

The Iterator design pattern, described in the Gang-of-Four book, lists more implementation options than are available in the java.util.* classes. One of the questions asked is: who controls the iteration? In the standard Java Iterator idiom, the client controls it, for example like this:

Iterator<String> it = names.iterator();
while(it.hasNext()) {
  String name = it.next();
  System.out.println(name);
}
  

When we use the Java 5 for-each loop, the iteration is still controlled by the client, even though we do not explicitely call the next() method.

for(String name : names) {
  System.out.println(name);
}
  

Concurrency and iteration is tricky. The approach in Java 1 was to synchronize everything. However, we could still iterate through the vector whilst it was being changed, resulting in unpredictable behaviour. In the Java 2 idiom, collections were by default unsynchronized. We would thus need to lock the entire collection whilst we were iterating to avoid a ConcurrentModificationException. In the Java 5 idiom, we see classes like CopyOnWriteArrayList, where the underlying array is copied every time it is modified. This is expensive, but guarantees lock-free iteration.

In this alternative approach, we control the iteration from within the collection, making concurrency easier.

We define an interface Processor with a single method process() that returns a boolean. The return value is used to determine whether we should stop iterating. Our collection would then apply this processor to every element in the collection. For example, here is my Processor interface with my collection's iterate() method.

public interface Processor<E> {
  boolean process(E e);
}

// in our collection class
public void iterate(Processor<E> processor) {
  for (E e : wrappedCollection) {
    if (!processor.process(e)) break;
  }
}
  

We can then use the ReadWriteLock to differentiate between methods that modify the state of the collection and those that do not. For example, the add(E) method would lock on a write lock, but the size() method only on a read lock.

The easiest way to implement a collection is to extend AbstractCollection. All we have to do is implement the size() and iterate() methods, like so:

import java.util.*;

public class SimpleCollection<E> extends AbstractCollection<E> {
  private final E[] values;

  public SimpleCollection(E[] values) {
    this.values = values.clone();
  }

  public Iterator<E> iterator() {
    return new Iterator<E>() {
      private int pos = 0;
      public boolean hasNext() {
        return pos < values.length;
      }

      public E next() {
        // Thanks to Volker Glave for pointing out that we need
        // to throw a NoSuchElementException if we try to go past
        // the end of the array.
        if (!hasNext()) throw new NoSuchElementException();
        return values[pos++];
      }

      public void remove() {
        throw new UnsupportedOperationException();
      }
    };
  }

  public int size() {
    return values.length;
  }
}
  

This is an immutable collection, but by implementing the add() and Iterator.remove() methods, we could make it into a normal mutable collection. Methods like isEmpty() and toString() simply use our internal iterator() and size() methods.

public class SimpleCollectionTest {
  public static void main(String[] args) {
    String[] arr = {"John", "Andre", "Neil", "Heinz", "Anton"};
    SimpleCollection<String> names =
        new SimpleCollection<String>(arr);

    for (String name : names) { // works
      System.out.println(name);
    }

    System.out.println(names); // works

    System.out.println(names.isEmpty()); // works

    names.add("Dirk"); // throws UnsupportedOperationException

    names.remove("Neil"); // throws UnsupportedOperationException
  }
}
  

In our WalkingCollection, we also use the concept of ReadWriteLocks to differentiate between read-only and read-write methods. We should be able to iterate with multiple threads at the same time, but only one thread may modify it at once. I am a bit wary of ReadWriteLocks due to the possibility of starvation, as described in Newsletter #165.

An initial implementation of the WalkingCollection just overrides the minimum number of methods. If you want to use bulk update methods like addAll(), it would be more efficient to only acquire the lock a single time. However, I have not called the bulk update methods very often in my last 12.5 years of Java programming, so will leave that as an "exercise to the reader".

One of the issues with the ReentrantReadWriteLock implementation is that you can downgrade a write lock to a read lock, but you cannot upgrade the read to a write. You do not even get an exception, the thread just hangs up. In our implementation, we thus keep a ThreadLocal to indicate that we are busy iterating and check this before acquiring the write lock. An exception is better to see than for the thread to simply go into the BLOCKED state.

import java.util.*;
import java.util.concurrent.locks.*;

public class WalkingCollection<E>
    extends AbstractCollection<E> {
  private final static ThreadLocal<Boolean> iterating =
      new ThreadLocal<Boolean>() {
        protected Boolean initialValue() {
          return false;
        }
      };
  private final Collection<E> wrappedCollection;
  private final ReentrantReadWriteLock rwlock =
      new ReentrantReadWriteLock();

  public WalkingCollection(Collection<E> wrappedCollection) {
    this.wrappedCollection = wrappedCollection;
  }

  public void iterate(Processor<E> processor) {
    rwlock.readLock().lock();
    try {
      iterating.set(true);
      for (E e : wrappedCollection) {
        if (!processor.process(e)) break;
      }
    } finally {
      iterating.set(false);
      rwlock.readLock().unlock();
    }
  }

  public Iterator<E> iterator() {
    rwlock.readLock().lock();
    try {
      final Iterator<E> wrappedIterator =
          wrappedCollection.iterator();
      return new Iterator<E>() {
        public boolean hasNext() {
          rwlock.readLock().lock();
          try {
            return wrappedIterator.hasNext();
          } finally {
            rwlock.readLock().unlock();
          }
        }

        public E next() {
          rwlock.readLock().lock();
          try {
            return wrappedIterator.next();
          } finally {
            rwlock.readLock().unlock();
          }
        }

        public void remove() {
          checkForIteration();
          rwlock.writeLock().lock();
          try {
            wrappedIterator.remove();
          } finally {
            rwlock.writeLock().unlock();
          }
        }
      };
    } finally {
      rwlock.readLock().unlock();
    }
  }

  public int size() {
    rwlock.readLock().lock();
    try {
      return wrappedCollection.size();
    } finally {
      rwlock.readLock().unlock();
    }
  }

  public boolean add(E e) {
    checkForIteration();
    rwlock.writeLock().lock();
    try {
      return wrappedCollection.add(e);
    } finally {
      rwlock.writeLock().unlock();
    }
  }

  private void checkForIteration() {
    if (iterating.get())
      throw new IllegalMonitorStateException(
          "Cannot modify whilst iterating");
  }
}
  

To use this, we define processors that can do something with the contents of the collection. For example, the PrintProcessor prints each element to the console:

public class PrintProcessor<E> implements Processor<E> {
  public boolean process(E o) {
    System.out.println(">>> " + o);
    return true;
  }
}
  

The AddProcessor adds up the numbers in the collection and returns the total as a double:

public class AddProcessor<N extends Number>
    implements Processor<N> {
  private double total = 0;

  public boolean process(N n) {
    total += n.doubleValue();
    return true;
  }

  public double getTotal() {
    return total;
  }

  public void reset() {
    total = 0;
  }
}
  

We can even define a CompositeProcessor that aggregates several processors together:

import java.util.*;

public class CompositeProcessor<E>
    implements Processor<E> {
  private final List<Processor<E>> processors =
      new ArrayList<Processor<E>>();

  public void add(Processor<E> processor) {
    processors.add(processor);
  }

  public boolean process(E e) {
    for (Processor<E> processor : processors) {
      if (!processor.process(e)) return false;
    }
    return true;
  }
}
  

We can combine these in a test class that processes a bunch of numbers, as such:

public class WalkingCollectionTest {
  public static void main(String[] args) {
    WalkingCollection<Long> ages = new WalkingCollection<Long>(
        new java.util.ArrayList<Long>()
    );

    ages.add(10L);
    ages.add(35L);
    ages.add(12L);
    ages.add(33L);

    PrintProcessor<Long> pp = new PrintProcessor<Long>();
    ages.iterate(pp);

    AddProcessor<Long> ap = new AddProcessor<Long>();
    ages.iterate(ap);
    System.out.println("ap.getTotal() = " + ap.getTotal());

    // composite
    System.out.println("Testing Composite");
    ap.reset();

    CompositeProcessor<Long> composite =
        new CompositeProcessor<Long>();
    composite.add(new Processor<Long>() {
      private long previous = Long.MIN_VALUE;
      public boolean process(Long current) {
        boolean result = current >= previous;
        previous = current;
        return result;
      }
    });
    composite.add(ap);
    composite.add(pp);
    ages.iterate(composite);
    System.out.println("ap.getTotal() = " + ap.getTotal());
  }
}
  

Here is the output from our test code:

>>> 10
>>> 35
>>> 12
>>> 33
ap.getTotal() = 90.0
Testing Composite
>>> 10
>>> 35
ap.getTotal() = 45.0
  

One of the restrictions of the iterate() method is that we cannot modify the collection from within the process() methods. We can "downgrade" a write lock to a read lock, but not the other way round. During the iteration, we are holding the read lock. In our WalkingCollection, we throw an IllegalMonitorStateException when this happens:

public class WalkingCollectionBrokenTest {
  public static void main(String[] args) {
    final WalkingCollection<String> names =
        new WalkingCollection<String>(
            new java.util.ArrayList<String>()
        );

    names.add("Maximilian");
    names.add("Constance");
    names.add("Priscilla");
    names.add("Evangeline");

    Processor<String> pp = new Processor<String>() {
      public boolean process(String s) {
        if ("Priscilla".equals(s)) names.remove(s);
        return true;
      }
    };
    names.iterate(pp);
  }
}
  

We see the following output:

Exception in thread "main" java.lang.IllegalMonitorStateException:
    Cannot modify whilst iterating
  at WalkingCollection.checkForIteration(WalkingCollection.java:93)
  ... etc.
  

I created a course on Design Patterns before my daughter Connie was born. She is now 8 years old. Over the years, I added patterns and removed obsolete ones, but a surprising amount of material has survived 8 years of programming advances. Once a company has bought one of my Design Patterns courses, they usually end up wanting to train dozens (or even hundreds) of their developers, so it remains one of my best selling courses of all time. Click here for more information about the Design Patterns Course.

Time to go out for a walk with my kids, so I hope you enjoyed this newsletter.

Kind regards

Heinz

Tips and Tricks Articles Related Java Course

Extreme Java - Concurrency and Performance for Java 8
Extreme Java - Advanced Topics for Java 8
Design Patterns
In-House Courses

© 2010-2016 Heinz Kabutz - All Rights Reserved Sitemap
Oracle and Java are registered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners. JavaSpecialists.eu is not connected to Oracle, Inc. and is not sponsored by Oracle, Inc.