|
The Java Specialists' Newsletter
Issue 169 2009-01-22
Category:
Performance
Java version: Java 5+ Monitoring Socketsby Dr. Heinz M. KabutzAbstract:
In this newsletter, we show two approaches for listening to bytes on sockets.
The first uses the Delegator from our previous newsletter, whereas the second
uses AspectJ to intercept the call to Socket.getInput/OutputStream. We also
write an MBean to publish this information in JConsole.
Welcome to the 169th issue of The Java(tm) Specialists' Newsletter. We pressed our first
batch of olive oil this week. On Crete they say that the
Akrotiri, where our plot is situated, is one of the best
areas for olives. The farmers that were harvesting the
olives were raving about our trees, saying that their
oil is the best of the best. We've eaten about half a liter
this week alone and it is truly sublime. Unfortunately since
we do not have irrigation water installed yet, the yield is
not that high this year. But come visit us in Crete and
we'll give you a bottle to take home with you.
Upcoming Java Specialist Master Courses:
- please click here to sign up.
As from May 2010, we are also offering this course on the island of Crete. We
only accept 6 students per class in Crete, due to the size of our conference
room. Please book early to avoid disappointment!
San Jose CA, Mar 16-19 2010, $3500 Ottawa, Canada, Mar 22-25 2010, $3500 Oslo, Norway, Apr 13-16 2010, Kr 24500 Montreal, Canada, Apr 20-23 2010, $3500 Toronto, Canada, May 17-20 2010, $3500 Chania, Crete, May 25-28, Jun 29-Jul 2 or Aug 24-27 2010, €2500
In-house courses if these dates or locations do not suit you - click here for more information. Monitoring Sockets
In our previous
newsletter, we showed how we could subclass
package access classes using reflection. Before we look at
how we use the Delegator to listen to bytes on sockets, there
are some alternative solutions that could work. One option
is to use AspectJ to add a Pointcut to the
Socket.getInputStream() and Socket.getOutputStream(). I will
show you that solution at the end of this newsletter.
Another option might be to use BTrace.
Jaroslav Bachorik showed how to monitor sockets
with BTrace. Another option is
to modify java.net.PlainSocketImpl to return different
input and output streams. You would then need to specify
your modified java.net.PlainSocketImpl class in the
bootclasspath. You can see such an approach in Newsletter #038.
We will show the most important class first. Most of the
rest of this newsletter will be optional extras that you can
modify to suit your particular environment. In
MonitoringSocketImpl, we subclass SocketImpl
and use the Delegator
to route the method calls to the real instance of
SocketImpl. The only two methods we change are
getInputStream() and
getOutputStream(), by wrapping the Delegator's
streams with socket monitoring streams.
package performance;
import util.Delegator;
import java.io.*;
import java.net.*;
import java.lang.reflect.Field;
public class MonitoringSocketImpl extends SocketImpl {
private final Delegator delegator;
public MonitoringSocketImpl() throws IOException {
this.delegator = new Delegator(this, SocketImpl.class,
"java.net.SocksSocketImpl");
}
private Socket getSocket() throws IOException {
try {
Field socket = SocketImpl.class.getDeclaredField("socket");
socket.setAccessible(true);
return (Socket) socket.get(this);
} catch (Exception e) {
throw new IOException("Could not discover real socket");
}
}
public InputStream getInputStream() throws IOException {
InputStream real = delegator.invoke();
return new SocketMonitoringInputStream(getSocket(), real);
}
public OutputStream getOutputStream() throws IOException {
OutputStream real = delegator.invoke();
return new SocketMonitoringOutputStream(getSocket(), real);
}
// the rest of the class is plain delegation to real SocketImpl
public void create(boolean stream) throws IOException {
delegator.invoke(stream);
}
public void connect(String host, int port)
throws IOException {
delegator.invoke(host, port);
}
// We specify the exact method to delegate to. Not actually
// necessary here, but just to show how you would do it.
public void connect(InetAddress address, int port)
throws IOException {
delegator
.delegateTo("connect", InetAddress.class, int.class)
.invoke(address, port);
}
public void connect(SocketAddress address, int timeout)
throws IOException {
delegator.invoke(address, timeout);
}
public void bind(InetAddress host, int port)
throws IOException {
delegator.invoke(host, port);
}
public void listen(int backlog) throws IOException {
delegator.invoke(backlog);
}
public void accept(SocketImpl s) throws IOException {
delegator.invoke(s);
}
public int available() throws IOException {
Integer result = delegator.invoke();
return result;
}
public void close() throws IOException {
delegator.invoke();
}
public void shutdownInput() throws IOException {
delegator.invoke();
}
public void shutdownOutput() throws IOException {
delegator.invoke();
}
public FileDescriptor getFileDescriptor() {
return delegator.invoke();
}
public InetAddress getInetAddress() {
return delegator.invoke();
}
public int getPort() {
Integer result = delegator.invoke();
return result;
}
public boolean supportsUrgentData() {
Boolean result = delegator.invoke();
return result;
}
public void sendUrgentData(int data) throws IOException {
delegator.invoke(data);
}
public int getLocalPort() {
Integer result = delegator.invoke();
return result;
}
public String toString() {
return delegator.invoke();
}
public void setPerformancePreferences(int connectionTime,
int latency,
int bandwidth) {
delegator.invoke(connectionTime, latency, bandwidth);
}
public void setOption(int optID, Object value)
throws SocketException {
delegator.invoke(optID, value);
}
public Object getOption(int optID) throws SocketException {
return delegator.invoke(optID);
}
}
The next piece of the puzzle is where we create a
SocketImplFactory to be used with
Socket and ServerSocket.
package performance;
import java.net.*;
public class MonitoringSocketFactory
implements SocketImplFactory {
public SocketImpl createSocketImpl() {
try {
return new MonitoringSocketImpl();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
We can set that using the static methods in the Socket and
ServerSocket classes. Note that you are only allowed to call
these methods once during the life of the virtual machine.
SocketImplFactory socketImplFactory =
new MonitoringSocketFactory();
Socket.setSocketImplFactory(socketImplFactory);
ServerSocket.setSocketFactory(socketImplFactory);
To have a central place for all the socket monitoring code,
I wrote a Singleton called
SocketMonitoringSystem. Singletons should be
avoided in
normal code, but in this case we want to be able to enable
and disable socket monitoring with as little code as
possible. Another alternative is to have a configuration
file that the socket monitoring code would use.
Our SocketMonitoringSystem allows us to add
observers for the socket monitoring, which will be notified
whenever bytes are read or written on the socket. Here is
the interface that observers need to implement:
package performance;
import java.io.IOException;
import java.net.Socket;
public interface SocketMonitor {
void write(Socket socket, int data) throws IOException;
void write(Socket socket, byte[] data, int off, int len)
throws IOException;
void read(Socket socket, int data) throws IOException;
void read(Socket socket, byte[] data, int off, int len)
throws IOException;
}
The SocketMonitoringSystem then looks like this.
The initForDelegator() method should only be
called if we want to use the Delegator approach to monitoring
sockets. If we want to use AspectJ, we should not call this
method.
package performance;
import java.io.IOException;
import java.net.*;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArraySet;
public class SocketMonitoringSystem {
private final static SocketMonitoringSystem instance =
new SocketMonitoringSystem();
public static SocketMonitoringSystem getInstance() {
return instance;
}
/**
* This should only be called if we want to use the reflection
* approach (Delegator). If we use Aspects, we should not call
* this method.
*/
public static void initForDelegator() throws IOException {
SocketImplFactory socketImplFactory =
new MonitoringSocketFactory();
Socket.setSocketImplFactory(socketImplFactory);
ServerSocket.setSocketFactory(socketImplFactory);
}
private SocketMonitoringSystem() {
}
private final Collection<SocketMonitor> monitors =
new CopyOnWriteArraySet<SocketMonitor>();
public void add(SocketMonitor monitor) {
monitors.add(monitor);
}
public void remove(SocketMonitor monitor) {
monitors.remove(monitor);
}
public void write(Socket socket, int data)
throws IOException {
for (SocketMonitor monitor : monitors) {
monitor.write(socket, data);
}
}
public void write(Socket socket,
byte[] data, int offset, int length)
throws IOException {
for (SocketMonitor monitor : monitors) {
monitor.write(socket, data, offset, length);
}
}
public void read(Socket socket, int data)
throws IOException {
for (SocketMonitor monitor : monitors) {
monitor.read(socket, data);
}
}
public void read(Socket socket,
byte[] data, int offset, int length)
throws IOException {
for (SocketMonitor monitor : monitors) {
monitor.read(socket, data, offset, length);
}
}
}
The socket monitoring streams now simply notify the
SocketMonitoringSystem whenever bytes are read
or written.
package performance;
import java.io.*;
import java.net.Socket;
public class SocketMonitoringInputStream extends InputStream {
private final Socket socket;
private final InputStream in;
public SocketMonitoringInputStream(Socket socket,
InputStream in)
throws IOException {
this.socket = socket;
this.in = in;
}
public int read() throws IOException {
int result = in.read();
if (result != -1) {
SocketMonitoringSystem.getInstance().read(socket, result);
}
return result;
}
public int read(byte[] b, int off, int len)
throws IOException {
int length = in.read(b, off, len);
if (length != -1) {
SocketMonitoringSystem.getInstance().
read(socket, b, off, length);
}
return length;
}
}
package performance;
import java.io.*;
import java.net.Socket;
public class SocketMonitoringOutputStream extends OutputStream {
private final OutputStream out;
private final Socket socket;
public SocketMonitoringOutputStream(Socket socket,
OutputStream out)
throws IOException {
this.out = out;
this.socket = socket;
}
public void write(int b) throws IOException {
out.write(b);
SocketMonitoringSystem.getInstance().write(socket, b);
}
public void write(byte[] b, int off, int length)
throws IOException {
out.write(b, off, length);
SocketMonitoringSystem.getInstance().
write(socket, b, off, length);
}
}
The next job is to write some implementations of
SocketMonitor. In this newsletter we will just
show one implementation, which uses MBeans to publish
information of bytes sent over the sockets. However, you
could also dump the bytes into files for later inspection.
MBean for Monitoring Socket
We start by defining an interface that JConsole can use to
display the information. We could write a nice GUI plugin
for JConsole, but that is beyond the scope of this
newsletter.
package stats;
import performance.SocketMonitor;
import java.net.*;
public interface SocketStatsMBean {
void printStats();
void reset();
long getBytesRead();
long getBytesWritten();
String getIndividualBytesWritten();
String getIndividualBytesRead();
}
We implement this with the SocketStats class, which also
implements the SocketMonitor interface. We use AtomicLong
to count the bytes, so that they can be updated from various
threads with minimal contention.
Note that this class keeps references to Sockets that might
not be used anymore. You need to manually clear them by
calling the reset() mehod. If you don't like
this behaviour, then please replace the ConcurrentMap with a
WeakHashMap.
package stats;
import performance.SocketMonitor;
import java.io.IOException;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class SocketStats implements
SocketStatsMBean, SocketMonitor {
private final AtomicLong bytesRead = new AtomicLong(0);
private final AtomicLong bytesWritten = new AtomicLong(0);
private final ConcurrentMap<Socket, AtomicLong>
individualBytesRead =
new ConcurrentHashMap<Socket, AtomicLong>();
private final ConcurrentMap<Socket, AtomicLong>
individualBytesWritten =
new ConcurrentHashMap<Socket, AtomicLong>();
SocketStats() {
}
public void reset() {
bytesRead.set(0);
bytesWritten.set(0);
individualBytesRead.clear();
individualBytesWritten.clear();
}
public long getBytesRead() {
return bytesRead.longValue();
}
public long getBytesWritten() {
return bytesWritten.longValue();
}
public void write(Socket socket, int data) {
bytesWritten.incrementAndGet();
increment(socket, 1, individualBytesWritten);
}
public void write(Socket socket, byte[] data, int off, int len)
throws IOException {
bytesWritten.addAndGet(len);
increment(socket, len, individualBytesWritten);
}
private void increment(Socket socket, int bytes,
ConcurrentMap<Socket, AtomicLong> map) {
AtomicLong counter = map.get(socket);
if (counter == null) {
counter = new AtomicLong(0);
AtomicLong temp = map.putIfAbsent(
socket, counter);
if (temp != null) {
counter = temp;
}
}
counter.addAndGet(bytes);
}
public void read(Socket s, int data) {
bytesRead.incrementAndGet();
increment(s, 1, individualBytesRead);
}
public void read(Socket s, byte[] data, int off, int len) {
bytesRead.addAndGet(len);
increment(s, len, individualBytesRead);
}
public String getIndividualBytesWritten() {
return convertToString(individualBytesWritten);
}
public String getIndividualBytesRead() {
return convertToString(individualBytesRead);
}
private String convertToString(Map<?, ?> map) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<?, ?> entry : map.entrySet()) {
sb.append("\n");
sb.append("\t\t");
sb.append(entry.getKey());
sb.append(" -> ");
sb.append(entry.getValue());
}
return sb.toString();
}
public String toString() {
return "SocketStats:\n" +
"\tbytes read = " + getBytesRead() + "\n" +
"\tbytes written = " + getBytesWritten() + "\n" +
"\tindividual bytes read = " +
getIndividualBytesRead() + "\n" +
"\tindividual bytes written = " +
getIndividualBytesWritten();
}
public void printStats() {
System.out.println(this);
}
}
We can publish this MBean using the StatsManager class:
package stats;
import javax.management.*;
import java.lang.management.ManagementFactory;
public class StatsManager {
private static final SocketStats socketStats =
new SocketStats();
public static SocketStats getSocketStats() {
return socketStats;
}
static {
try {
MBeanServer mbs =
ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(socketStats, new ObjectName(
"eu.javaspecialists.stats:type=SocketStats"));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
When we start our program, we need to add the
SocketStats instance as an observer to the
SocketMonitoringSystem. We can do that like
this:
SocketMonitoringSystem.initForDelegator();
SocketMonitoringSystem.getInstance().add(
StatsManager.getSocketStats());
That is all we need to do to monitor our sockets. In
JConsole, we can now go to the MBeans tab and there should
find the
eu.javaspecialists.stats folder. Inside that
we will find SocketStats. If we click on Attributes, we
will see how many bytes have been read and written on the
sockets. In Operations, we can reset() the stats or print
them to the console.
AspectJ
Instead of using the Delegator, we can also write an Aspect
that intercepts calls to Socket.getInput/OutputStream.
Several readers suggested this approach, but unfortunately
none offered a ready solution. After some research, I
figured out how to do it with the annotation-based AspectJ.
Suggestions on how to improve it are welcome.
package performance.aspects;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import performance.*;
import performance.SocketMonitor;
import java.io.*;
import java.net.Socket;
import stats.StatsManager;
@Aspect
public class BoundSocket {
public BoundSocket() {
SocketMonitoringSystem.getInstance().add(
StatsManager.getSocketStats());
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
StatsManager.getSocketStats().printStats();
}
});
}
@Pointcut("call(* java.net.Socket.getInputStream()) && target(s)")
void input(Socket s) {
}
@Around("input(s)")
public Object wrapInputStream(ProceedingJoinPoint joinPoint,
Socket s)
throws Throwable {
InputStream in = (InputStream) joinPoint.proceed();
return new SocketMonitoringInputStream(s, in);
}
@Pointcut("call(* java.net.Socket.getOutputStream()) && target(s)")
void output(Socket s) {
}
@Around("output(s)")
public Object wrapOutputStream(ProceedingJoinPoint joinPoint,
Socket s)
throws Throwable {
OutputStream out = (OutputStream) joinPoint.proceed();
return new SocketMonitoringOutputStream(s, out);
}
}
When we use this Aspect, we do not need to modify our code at
all. In my Delegator solution, I seemed to pick up all
sockets created, even those used by the JMX remoting. With
the Aspect solution, I did not see the JMX remoting sockets.
I am not sure what other sockets would be missing with the
Aspect approach.
I think that's enough code to last you for a few days of
reading. Please let me know if you need help getting this
code working on your system.
Please also let us know as soon as possible if you would like
to attend our courses
in Canada. We already had to waitlist half a dozen
people for Ottawa. The other courses are also rapidly
filling up.
Kind regards
Heinz
Performance Articles
Related Java Course
|