Running on Java 22-ea+27-2262 (Preview)
Home of The JavaSpecialists' Newsletter

169Monitoring Sockets

Author: Dr. Heinz M. KabutzDate: 2009-01-22Java Version: 5Category: Performance
 

Abstract: In this newsletter, we show two approaches for listening to bytes on sockets. The first uses the Delegator from our previous newsletter, whereas the second uses AspectJ to intercept the call to Socket.getInput/OutputStream. We also write an MBean to publish this information in JConsole.

 

Welcome to the 169th issue of The Java(tm) Specialists' Newsletter. We pressed our first batch of olive oil this week. On Crete they say that the Akrotiri, where our plot is situated, is one of the best areas for olives. The farmers that were harvesting the olives were raving about our trees, saying that their oil is the best of the best. We've eaten about half a liter this week alone and it is truly sublime. Unfortunately since we do not have irrigation water installed yet, the yield is not that high this year. But come visit us in Crete and we'll give you a bottle to take home with you.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Monitoring Sockets

In our previous newsletter, we showed how we could subclass package access classes using reflection. Before we look at how we use the Delegator to listen to bytes on sockets, there are some alternative solutions that could work. One option is to use AspectJ to add a Pointcut to the Socket.getInputStream() and Socket.getOutputStream(). I will show you that solution at the end of this newsletter. Another option might be to use BTrace. Jaroslav Bachorik showed how to monitor sockets with BTrace. Another option is to modify java.net.PlainSocketImpl to return different input and output streams. You would then need to specify your modified java.net.PlainSocketImpl class in the bootclasspath. You can see such an approach in Newsletter #038.

We will show the most important class first. Most of the rest of this newsletter will be optional extras that you can modify to suit your particular environment. In MonitoringSocketImpl, we subclass SocketImpl and use the Delegator to route the method calls to the real instance of SocketImpl. The only two methods we change are getInputStream() and getOutputStream(), by wrapping the Delegator's streams with socket monitoring streams.

package performance;

import util.Delegator;

import java.io.*;
import java.net.*;
import java.lang.reflect.Field;

public class MonitoringSocketImpl extends SocketImpl {
  private final Delegator delegator;

  public MonitoringSocketImpl() throws IOException {
    this.delegator = new Delegator(this, SocketImpl.class,
        "java.net.SocksSocketImpl");
  }

  private Socket getSocket() throws IOException {
    try {
       Field socket = SocketImpl.class.getDeclaredField("socket");
       socket.setAccessible(true);
       return (Socket) socket.get(this);
    } catch (Exception e) {
      throw new IOException("Could not discover real socket");
    }
  }

  public InputStream getInputStream() throws IOException {
    InputStream real = delegator.invoke();
    return new SocketMonitoringInputStream(getSocket(), real);
  }

  public OutputStream getOutputStream() throws IOException {
    OutputStream real = delegator.invoke();
    return new SocketMonitoringOutputStream(getSocket(), real);
  }

  // the rest of the class is plain delegation to real SocketImpl
  public void create(boolean stream) throws IOException {
    delegator.invoke(stream);
  }

  public void connect(String host, int port)
      throws IOException {
    delegator.invoke(host, port);
  }

  // We specify the exact method to delegate to.  Not actually
  // necessary here, but just to show how you would do it.
  public void connect(InetAddress address, int port)
      throws IOException {
    delegator
        .delegateTo("connect", InetAddress.class, int.class)
        .invoke(address, port);
  }

  public void connect(SocketAddress address, int timeout)
      throws IOException {
    delegator.invoke(address, timeout);
  }

  public void bind(InetAddress host, int port)
      throws IOException {
    delegator.invoke(host, port);
  }

  public void listen(int backlog) throws IOException {
    delegator.invoke(backlog);
  }

  public void accept(SocketImpl s) throws IOException {
    delegator.invoke(s);
  }

  public int available() throws IOException {
    Integer result = delegator.invoke();
    return result;
  }

  public void close() throws IOException {
    delegator.invoke();
  }

  public void shutdownInput() throws IOException {
    delegator.invoke();
  }

  public void shutdownOutput() throws IOException {
    delegator.invoke();
  }

  public FileDescriptor getFileDescriptor() {
    return delegator.invoke();
  }

  public InetAddress getInetAddress() {
    return delegator.invoke();
  }

  public int getPort() {
    Integer result = delegator.invoke();
    return result;
  }

  public boolean supportsUrgentData() {
    Boolean result = delegator.invoke();
    return result;
  }

  public void sendUrgentData(int data) throws IOException {
    delegator.invoke(data);
  }

  public int getLocalPort() {
    Integer result = delegator.invoke();
    return result;
  }

  public String toString() {
    return delegator.invoke();
  }

  public void setPerformancePreferences(int connectionTime,
                                        int latency,
                                        int bandwidth) {
    delegator.invoke(connectionTime, latency, bandwidth);
  }

  public void setOption(int optID, Object value)
      throws SocketException {
    delegator.invoke(optID, value);
  }

  public Object getOption(int optID) throws SocketException {
    return delegator.invoke(optID);
  }
}

The next piece of the puzzle is where we create a SocketImplFactory to be used with Socket and ServerSocket.

package performance;

import java.net.*;

public class MonitoringSocketFactory
    implements SocketImplFactory {
  public SocketImpl createSocketImpl() {
    try {
      return new MonitoringSocketImpl();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

We can set that using the static methods in the Socket and ServerSocket classes. Note that you are only allowed to call these methods once during the life of the virtual machine.

SocketImplFactory socketImplFactory =
    new MonitoringSocketFactory();
Socket.setSocketImplFactory(socketImplFactory);
ServerSocket.setSocketFactory(socketImplFactory);

To have a central place for all the socket monitoring code, I wrote a Singleton called SocketMonitoringSystem. Singletons should be avoided in normal code, but in this case we want to be able to enable and disable socket monitoring with as little code as possible. Another alternative is to have a configuration file that the socket monitoring code would use.

Our SocketMonitoringSystem allows us to add observers for the socket monitoring, which will be notified whenever bytes are read or written on the socket. Here is the interface that observers need to implement:

package performance;

import java.io.IOException;
import java.net.Socket;

public interface SocketMonitor {
  void write(Socket socket, int data) throws IOException;

  void write(Socket socket, byte[] data, int off, int len)
      throws IOException;

  void read(Socket socket, int data) throws IOException;

  void read(Socket socket, byte[] data, int off, int len)
      throws IOException;
}

The SocketMonitoringSystem then looks like this. The initForDelegator() method should only be called if we want to use the Delegator approach to monitoring sockets. If we want to use AspectJ, we should not call this method.

package performance;

import java.io.IOException;
import java.net.*;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArraySet;

public class SocketMonitoringSystem {
  private static final SocketMonitoringSystem instance =
      new SocketMonitoringSystem();

  public static SocketMonitoringSystem getInstance() {
    return instance;
  }

  /**
   * This should only be called if we want to use the reflection
   * approach (Delegator).  If we use Aspects, we should not call
   * this method.
   */
  public static void initForDelegator() throws IOException {
    SocketImplFactory socketImplFactory =
        new MonitoringSocketFactory();
    Socket.setSocketImplFactory(socketImplFactory);
    ServerSocket.setSocketFactory(socketImplFactory);
  }

  private SocketMonitoringSystem() {
  }

  private final Collection<SocketMonitor> monitors =
      new CopyOnWriteArraySet<SocketMonitor>();

  public void add(SocketMonitor monitor) {
    monitors.add(monitor);
  }

  public void remove(SocketMonitor monitor) {
    monitors.remove(monitor);
  }

  public void write(Socket socket, int data)
      throws IOException {
    for (SocketMonitor monitor : monitors) {
      monitor.write(socket, data);
    }
  }

  public void write(Socket socket,
                    byte[] data, int offset, int length)
      throws IOException {
    for (SocketMonitor monitor : monitors) {
      monitor.write(socket, data, offset, length);
    }
  }

  public void read(Socket socket, int data)
      throws IOException {
    for (SocketMonitor monitor : monitors) {
      monitor.read(socket, data);
    }
  }

  public void read(Socket socket,
                   byte[] data, int offset, int length)
      throws IOException {
    for (SocketMonitor monitor : monitors) {
      monitor.read(socket, data, offset, length);
    }
  }
}

The socket monitoring streams now simply notify the SocketMonitoringSystem whenever bytes are read or written.

package performance;

import java.io.*;
import java.net.Socket;

public class SocketMonitoringInputStream extends InputStream {
  private final Socket socket;
  private final InputStream in;

  public SocketMonitoringInputStream(Socket socket,
                                     InputStream in)
      throws IOException {
    this.socket = socket;
    this.in = in;
  }

  public int read() throws IOException {
    int result = in.read();
    if (result != -1) {
      SocketMonitoringSystem.getInstance().read(socket, result);
    }
    return result;
  }

  public int read(byte[] b, int off, int len)
      throws IOException {
    int length = in.read(b, off, len);
    if (length != -1) {
      SocketMonitoringSystem.getInstance().
          read(socket, b, off, length);
    }
    return length;
  }
}

package performance;

import java.io.*;
import java.net.Socket;

public class SocketMonitoringOutputStream extends OutputStream {
  private final OutputStream out;
  private final Socket socket;

  public SocketMonitoringOutputStream(Socket socket,
                                      OutputStream out)
      throws IOException {
    this.out = out;
    this.socket = socket;
  }

  public void write(int b) throws IOException {
    out.write(b);
    SocketMonitoringSystem.getInstance().write(socket, b);
  }

  public void write(byte[] b, int off, int length)
      throws IOException {
    out.write(b, off, length);
    SocketMonitoringSystem.getInstance().
        write(socket, b, off, length);
  }
}

The next job is to write some implementations of SocketMonitor. In this newsletter we will just show one implementation, which uses MBeans to publish information of bytes sent over the sockets. However, you could also dump the bytes into files for later inspection.

MBean for Monitoring Socket

We start by defining an interface that JConsole can use to display the information. We could write a nice GUI plugin for JConsole, but that is beyond the scope of this newsletter.

package stats;

import performance.SocketMonitor;
import java.net.*;

public interface SocketStatsMBean {
  void printStats();
  void reset();
  long getBytesRead();
  long getBytesWritten();
  String getIndividualBytesWritten();
  String getIndividualBytesRead();
}

We implement this with the SocketStats class, which also implements the SocketMonitor interface. We use AtomicLong to count the bytes, so that they can be updated from various threads with minimal contention.

Note that this class keeps references to Sockets that might not be used anymore. You need to manually clear them by calling the reset() mehod. If you don't like this behaviour, then please replace the ConcurrentMap with a WeakHashMap.

package stats;

import performance.SocketMonitor;

import java.io.IOException;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class SocketStats implements
    SocketStatsMBean, SocketMonitor {
  private final AtomicLong bytesRead = new AtomicLong(0);
  private final AtomicLong bytesWritten = new AtomicLong(0);
  private final ConcurrentMap<Socket, AtomicLong>
      individualBytesRead =
        new ConcurrentHashMap<Socket, AtomicLong>();
  private final ConcurrentMap<Socket, AtomicLong>
      individualBytesWritten =
        new ConcurrentHashMap<Socket, AtomicLong>();

  SocketStats() {
  }

  public void reset() {
    bytesRead.set(0);
    bytesWritten.set(0);
    individualBytesRead.clear();
    individualBytesWritten.clear();
  }

  public long getBytesRead() {
    return bytesRead.longValue();
  }

  public long getBytesWritten() {
    return bytesWritten.longValue();
  }

  public void write(Socket socket, int data) {
    bytesWritten.incrementAndGet();
    increment(socket, 1, individualBytesWritten);
  }

  public void write(Socket socket, byte[] data, int off, int len)
      throws IOException {
    bytesWritten.addAndGet(len);
    increment(socket, len, individualBytesWritten);
  }

  private void increment(Socket socket, int bytes,
                         ConcurrentMap<Socket, AtomicLong> map) {
    AtomicLong counter = map.get(socket);
    if (counter == null) {
      counter = new AtomicLong(0);
      AtomicLong temp = map.putIfAbsent(
          socket, counter);
      if (temp != null) {
        counter = temp;
      }
    }
    counter.addAndGet(bytes);
  }

  public void read(Socket s, int data) {
    bytesRead.incrementAndGet();
    increment(s, 1, individualBytesRead);
  }

  public void read(Socket s, byte[] data, int off, int len) {
    bytesRead.addAndGet(len);
    increment(s, len, individualBytesRead);
  }

  public String getIndividualBytesWritten() {
    return convertToString(individualBytesWritten);
  }

  public String getIndividualBytesRead() {
    return convertToString(individualBytesRead);
  }

  private String convertToString(Map<?, ?> map) {
    StringBuilder sb = new StringBuilder();
    for (Map.Entry<?, ?> entry : map.entrySet()) {
      sb.append("\n");
      sb.append("\t\t");
      sb.append(entry.getKey());
      sb.append(" -> ");
      sb.append(entry.getValue());
    }
    return sb.toString();
  }

  public String toString() {
    return "SocketStats:\n" +
        "\tbytes read    = " + getBytesRead() + "\n" +
        "\tbytes written = " + getBytesWritten() + "\n" +
        "\tindividual bytes read    = " +
          getIndividualBytesRead() + "\n" +
        "\tindividual bytes written = " +
          getIndividualBytesWritten();
  }

  public void printStats() {
    System.out.println(this);
  }
}

We can publish this MBean using the StatsManager class:

package stats;

import javax.management.*;
import java.lang.management.ManagementFactory;

public class StatsManager {
  private static final SocketStats socketStats =
      new SocketStats();

  public static SocketStats getSocketStats() {
    return socketStats;
  }

  static {
    try {
      MBeanServer mbs =
          ManagementFactory.getPlatformMBeanServer();
      mbs.registerMBean(socketStats, new ObjectName(
          "eu.javaspecialists.stats:type=SocketStats"));
    } catch (RuntimeException e) {
      throw e;
    } catch (Exception e) {
      throw new IllegalStateException(e);
    }
  }
}

When we start our program, we need to add the SocketStats instance as an observer to the SocketMonitoringSystem. We can do that like this:

SocketMonitoringSystem.initForDelegator();
SocketMonitoringSystem.getInstance().add(
    StatsManager.getSocketStats());

That is all we need to do to monitor our sockets. In JConsole, we can now go to the MBeans tab and there should find the eu.javaspecialists.stats folder. Inside that we will find SocketStats. If we click on Attributes, we will see how many bytes have been read and written on the sockets. In Operations, we can reset() the stats or print them to the console.

AspectJ

Instead of using the Delegator, we can also write an Aspect that intercepts calls to Socket.getInput/OutputStream. Several readers suggested this approach, but unfortunately none offered a ready solution. After some research, I figured out how to do it with the annotation-based AspectJ. Suggestions on how to improve it are welcome.

package performance.aspects;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import performance.*;
import performance.SocketMonitor;

import java.io.*;
import java.net.Socket;

import stats.StatsManager;

@Aspect
public class BoundSocket {
  public BoundSocket() {
    SocketMonitoringSystem.getInstance().add(
        StatsManager.getSocketStats());
    Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
        StatsManager.getSocketStats().printStats();
      }
    });
  }
  @Pointcut("call(* java.net.Socket.getInputStream()) && target(s)")
  void input(Socket s) {
  }

  @Around("input(s)")
  public Object wrapInputStream(ProceedingJoinPoint joinPoint,
                                Socket s)
      throws Throwable {
    InputStream in = (InputStream) joinPoint.proceed();
    return new SocketMonitoringInputStream(s, in);
  }

  @Pointcut("call(* java.net.Socket.getOutputStream()) && target(s)")
  void output(Socket s) {
  }

  @Around("output(s)")
  public Object wrapOutputStream(ProceedingJoinPoint joinPoint,
                                 Socket s)
      throws Throwable {
    OutputStream out = (OutputStream) joinPoint.proceed();
    return new SocketMonitoringOutputStream(s, out);
  }
}

When we use this Aspect, we do not need to modify our code at all. In my Delegator solution, I seemed to pick up all sockets created, even those used by the JMX remoting. With the Aspect solution, I did not see the JMX remoting sockets. I am not sure what other sockets would be missing with the Aspect approach.

I think that's enough code to last you for a few days of reading. Please let me know if you need help getting this code working on your system.

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...