Tuesday, May 1, 2018

Java Tips: Creating a Monitoring-friendly ExecutorService

In this article we will be extending an ExecutorService implementation with monitoring capabilities. This monitoring capability will help us to measure a number of pool parameters i.e., active threads, work queue size etc. in a live production environment. It will also enable us to measure task execution time, successful tasks count, and failed tasks count.

Monitoring Library

As for the monitoring library we will be using Metrics. For the sake of simplicity we will be using a ConsoleReporter which will report our metrics to the console. For production-grade applications, we should use an advanced reporter (i.e., Graphite reporter). If you are unfamiliar with Metrics, then I recommend you to go through the getting started guide.

Let's get started.

Extending the ThreadPoolExecutor

We will be using ThreadPoolExecutor as the base class for our new type. Let's call it MonitoredThreadPoolExecutor. This class will accept a MetricRegistry as one of its constructor parameters -

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
  private final MetricRegistry metricRegistry;

  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    this.metricRegistry = metricRegistry;
  }

  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    this.metricRegistry = metricRegistry;
  }

  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      RejectedExecutionHandler handler,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    this.metricRegistry = metricRegistry;
  }

  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    this.metricRegistry = metricRegistry;
  }
}

Registering Gauges to measure pool-specific paramters

A Gauge is an instantaneous measurement of a value. We will be using it to measure different pool parameters like number of active threads, task queue size etc.

Before we can register a Gauge, we need to decide how to calculate a metric name for our thread pool. Each metric, whether it's a Gauge, or a Timer, or simply a Meter, has a unique name. This name is used to identify the metric source. The convention here is to use a dotted string which is often constructed from the fully qualified name of the class being monitored.

For our thread pool, we will be using its fully qualified name as a prefix to our metrics names. Additionally we will add another constructor parameter called poolName, which will be used by the clients to specify instance-specific identifiers.

After implementing these changes the class looks like below -

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
  private final MetricRegistry metricRegistry;
  private final String metricsPrefix;

  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      MetricRegistry metricRegistry,
      String poolName
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    this.metricRegistry = metricRegistry;
    this.metricsPrefix = MetricRegistry.name(getClass(), poolName);
  }

  // Rest of the constructors
}

Now we are ready to register our Gauges. For this purpose we will define a private method -

private void registerGauges() {
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "corePoolSize"), (Gauge<Integer>) this::getCorePoolSize);
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "activeThreads"), (Gauge<Integer>) this::getActiveCount);
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "maxPoolSize"), (Gauge<Integer>) this::getMaximumPoolSize);
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "queueSize"), (Gauge<Integer>) () -> getQueue().size());
}

For our example we are measuring core pool size, number of active threads, maximum pool size, and task queue size. Depending on monitoring requirements we can register more/less Gauges to measure different properties.

This private method will now be invoked from all constructors -

public MonitoredThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    MetricRegistry metricRegistry,
    String poolName
) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  this.metricRegistry = metricRegistry;
  this.metricsPrefix = MetricRegistry.name(getClass(), poolName);
  registerGauges();
}

Measuring Task Execution Time

To measure the task execution time, we will override two life-cycle methods that ThreadPoolExecutor provides - beforeExecute and afterExecute.

As the name implies, beforeExecute callback is invoked prior to executing a task, by the thread that will execute the task. The default implementation of this callback does nothing.

Similarly, the afterExecute callback is invoked after each task is executed, by the thread that executed the task. The default implementation of this callback also does nothing. Even if the task throws an uncaught RuntimeException or Error, this callback will be invoked.

We will be starting a Timer in our beforeExecute override, which will then be used in our afterExecute override to get the total task execution time. To store a reference to the Timer we will introduce a new ThreadLocal field in our class.

The implementation of the callbacks are given below -

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
  private final MetricRegistry metricRegistry;
  private final String metricsPrefix;
  private ThreadLocal<Timer.Context> taskExecutionTimer = new ThreadLocal<>();

  // Constructors

  @Override
  protected void beforeExecute(Thread thread, Runnable task) {
    super.beforeExecute(thread, task);
    Timer timer = metricRegistry.timer(MetricRegistry.name(metricsPrefix, "task-execution"));
    taskExecutionTimer.set(timer.time());
  }

  @Override
  protected void afterExecute(Runnable task, Throwable throwable) {
    Timer.Context context = taskExecutionTimer.get();
    context.stop();
    super.afterExecute(task, throwable);
  }
}

Recording number of failed tasks due to uncaught exceptions

The second parameter to the afterExecute callback is a Throwable. If non-null, this Throwable refers to the uncaught RuntimeException or Error that caused the execution to terminate. We can use this information to partially count the total number of tasks that were terminated abruptly due to uncaught exceptions.

To get the total number of failed tasks, we must consider another case. Tasks submitted using the execute method will throw any uncaught exceptions, and it will be available as the second argument to the afterExecute callback. However, tasks submitted using the submit method are swallowed by the executor service. This is clearly explained in the JavaDoc (emphasis mine) -
Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method. If you would like to trap both kinds of failures in this method, you can further probe for such cases, as in this sample subclass that prints either the direct cause or the underlying exception if a task has been aborted
Fortunately, the same doc also offers a solution for this, which is to examine the runnable to see if it's a Future, and then get the underlying exception.

Combining these approaches, we can modify our afterExecute method as follows -

@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
  Timer.Context context = taskExecutionTimer.get();
  context.stop();

  super.afterExecute(runnable, throwable);
  if (throwable == null && runnable instanceof Future && ((Future) runnable).isDone()) {
    try {
      ((Future) runnable).get();
    } catch (CancellationException ce) {
      throwable = ce;
    } catch (ExecutionException ee) {
      throwable = ee.getCause();
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
    }
  }
  if (throwable != null) {
    Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));
    failedTasksCounter.inc();
  }
}


Counting total number of successful tasks

The previous approach can also be used to count the total number of successful tasks: tasks that were completed without throwing any exceptions or errors -

@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
  // Rest of the method body .....

  if (throwable != null) {
    Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));
    failedTasksCounter.inc();
  } else {
    Counter successfulTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "successful-tasks"));
    successfulTasksCounter.inc();
  }
}


Conclusion

In this article we have looked at a few monitoring-friendly customization to an ExecutorService implementation. Like always, any suggestions/improvements/bug fix will be highly appreciated. As for the example source code, it has been uploaded to Github.

2 comments:

  1. Great article! Very helpful! Just a small suggestion: I started using https://micrometer.io/ instead of http://metrics.dropwizard.io/. That made migrating from Grafana to Prometheus a matter of changing a couple of lines of configuration

    ReplyDelete
    Replies
    1. Nice! I have also been hearing a lot about Prometheus lately, will definitely check it out!

      Delete