July 26, 2015

CompletableFuture: Clean callbacks with Java 8’s Promises (part 1 of 4)



I’ve recently stumbled across the concept of promises and the fact that they were somewhat secretly introduced in Java 8. The CompletableFuture API in my opinion has the potential to change functional, asynchronous programming in Java as much as Lambdas do. I’ll cover the basics of this API in this article.

TL;DR: There’s a human-readable overview of the CompletableFuture callback API on the last page. Maybe that’s all you’re looking for.

Java 8’s CompletableFuture finally brings the concept of promises (as e.g. included in the Scala language and in the most recent ECMAScript 2015 standard) and as such monadic functions to the Java language. They are a brilliant choice to deal with asynchronous computations as they let us trade “callback hell” for a declarative, reactive API. Even if that sounds very academic, they offer real benefits for event-driven asynchronous programming you don’t want to miss.

With this article, I hope to cover the CompletableFuture API exhaustively; there’s also an accompanying GitHub repository which contains JUnit tests with example applications of the API.

Why should I make a Promise?

A promise is a future

According to the CompletableFuture’s Javadoc, a promise is a Future that may be explicitly completed. Well… Okay, let’s start the other way around. A Future represents the result of a computation which may or may not be completed at some point in the future. In case you are not already familiar with futures, here’s an example:
// 1 - build the task
FutureTask<String> retrieveName = new FutureTask<>(() -> {
    sleep(WAIT_MILIS);
    return "Future";
});

// 2 - start the task
startThread(() -> {
    retrieveName.run();

    // 3 - collect the result
    try {
        print(retrieveName.get());
    } catch (InterruptedException | ExecutionException ex) {
        throw new RuntimeException(ex);
    }
});
Here, a future task is created which does some calculations and eventually returns a String. FutureTask is an implementation of the Future interface. It wraps around a Callable (a Callable is a Runnable with doesn’t return void but a return value; here, the Callable is implemented as a lambda expression). It provides methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation (quote from the API).

In the example, the future task is scheduled for execution asynchronously, thus on a new thread (FutureTask#run() starts the task’s computation, let’s assume that #startThread(…) does Thread#start() to start the Thread). You would typically use an ExecutorService for threaded execution, but I neglected this facility to keep the code as simple as possible.

After it has been scheduled, the task may or may not be completed at some point in the future. That’s where one of those methods mentioned earlier come in handy.

Here, we use FutureTask#get() to query for the computed result. This method waits if necessary for the computation to complete, blocking the caller thread, and then retrieves its result. We then use the result in a subsequent function (here, it’s a #print(…) function). After all, the whole point of building up this Future value setup is that we want to do anything with the return value once we receive it. We always want to do anything with a Future’s return value; otherwise, we would rather start a simple Runnable for a fire & forget asynchronous task.

Typically, we already know what we want to do with the return value before we even start the task. In the example, it’s invoking the #print(…) method with the return value. It seems silly to wait for the task to complete in order to then instruct it what to do next (and repeat this for all subsequent steps). The conceptual problem with a future here is that you have to actively query for its completion using #get(). That’s polling. Wouldn’t it be much more elegant and reactive to have a way to declaratively react on the completion event? Through an… event listener, a callback? Now, we’re slowly getting there.

The problem with callbacks

So we want to define an event listener – a callback – which is invoked as soon as the asynchronous runnable has completed. It will collect the result and process it.

In order to define a task which executes a callback function rather than returning any value, it’s enough to derive from Runnable (for technical reasons, we have to derive, we cannot create a derived Runnable implicitly with lambdas). And we have to derive because we want to be able to provide the callback function via the constructor. The specialized runnable could look like this:
private static class RunnableCallback implements Runnable {
    private final Consumer<String> callback;
    
    public RunnableCallback(Consumer<String> callback) {
        this.callback = callback;
    }

    @Override
    public void run() {
        sleep(WAIT_MILIS);
        callback.accept("FutureCallback");
    }
}
It decouples task execution from the callback: Once the computation is finished, the callback is invoked with the computation outcome (a.k.a. the static String for the sake of this example).

We can then instantiate the task by providing the callback function, and eventually start it:
startThread(() -> {
    new RunnableCallback(it -> print(it)).run();
});
Here, the callback is again defined using a lambda expression. (Of course, the callback function would typically execute much more sophisticated operations which require the context of the original caller of the asynchronous function. Otherwise, you wouldn’t use a callback function, but implement its operations just inside the asynchronous function itself.)

This looks pretty straightforward and it works. The task is executed asynchronously and the calling thread never blocks with polling. The callback gets invoked as specified.

This solution seems fine for this very simple case. But what about more complex cases? For instance,
  • We should probably have a dedicated callback function in case an Exception occurs during processing (an “error callback”).
  • We may even have special requirements such as starting a range of asynchronous tasks and fire a callback only when the last task has completed.
  • Most importantly though, the situation will quickly become messy once we begin to chain callback functions, e.g. if one callback triggers another asynchronous function which in turn takes its own callback, and so on. This is what especially in the JavaScript / Node.js world (where asynchronous operations are quite frequent) is known as callback hell.
  • Also, a callback must not be called multiple times as it is really thought to signal that the function has completed which happens only once; we’d have to implement this mechanism ourselves.
Promises provide us with a sensible mechanism to mitigate callback hell and write clean asynchronous code. With CompletableFuture, the respective API is now part of Java.

Make a promise

The default use case for a promise which is equivalent to above implementations with a future or a callback is to specify a function to be executed with the result returned by the original asynchronous function after its successful completion. Let’s see how to do this.
// 1a - build the task;
final CompletableFuture<String> retrieveName = CompletableFuture.supplyAsync(() -> {
    sleep(WAIT_MILIS);
    return "Promise";
});
// 1b - define task result processing
retrieveName.thenAccept(it -> print(it));

// 2 - start the task
startThread(() -> {
    try {
        retrieveName.get();
    } catch (InterruptedException | ExecutionException ex) {
        throw new RuntimeException(ex);
    }
});
First of all, we have to wrap our asynchronous function into a CompletableFuture which is the actual promise object. Here, the asynchronous task is implemented as a lambda expression.

The CompletableFuture / promise object now offers a multitude of methods which help us specify the promises’ “monad” behavior.

This basic use case is covered by the #thenAccept(…) method which executes the Consumer function with the result of the wrapped asynchronous function as soon as the latter has completed successfully. Here, the Consumer is implemented as a lambda expression, once again.

Finally, we have to execute the task. Therefore we use #get() to explicitly query for the return value of the CompletableFuture; this is actually the equivalent of a Future#get() call. Note that just as Future#get(), the method blocks the caller thread, so we put it in a separate thread.

This solution works fine. We were able to specify any subsequent action for the return value of the asynchronous task declaratively and independently of its actual invocation.

Nevertheless, we missed an important point here. Going back to the CompletableFuture’s API, it is defined as “a Future that may be explicitly completed”. But actually, we didn’t explicitly mark the promise as completed. Instead we invoked the underlying future which, upon completion, called the CompletableFuture’s callback function(s). This is essentially still the same as a vanilla Future. Let’s see how much more elegant asynchronous tasks get with explicit promise completion.

Fulfill a promise

This is the code necessary to define a CompletableFuture and complete it explicitly:
// 1a - build the task
final CompletableFuture<String> future = new CompletableFuture<>();
// 1b - define task result processing
future.thenAccept(it -> print(it));

// 2 - start the task
startThread(() -> {
    sleep(WAIT_MILIS);
    future.complete("Future explicitly fulfilled");
});
First of all, we instantiate a new CompletableFuture. Note that this really is an “empty pod”. The actual asynchronous task we want to manage with this promise object can and will be instantiated independently of the CompletableFuture object.

As in the previous example, we have the whole CompletableFuture API available on our promise object which we use to declaratively specify its callbacks. Once again, we use a lambda expression in order to specify to invoke print on the return value.

Then we build and start the thread we will observe with the CompletableFuture object. We will now explicitly call the respective callback function of the CompletableFuture object to signal that the task has completed. We do this by using #complete(…) with the desired return value. Of course, the asynchronous task must have a reference of the CompletableFuture to invoke this method. Because now, the asynchronous task does not need to have a return value because we’ll never query for it explicitly, we can just implement it as a fire & forget Runnable (which is here implemented as a lambda expression).

And that’s it. Explicitly completing the promise fires the respective event handler. Thus, CompletableFuture really is a useful API to save you from building asynchronous function callbacks and it covers a lot of use cases as we shall see in the next section.

OK, but what is it actually good for?

As we just saw, CompletableFuture really is just a callback handling facility for a Future. A Future is a means to collect output data from asynchronous tasks. Hence, a CompletableFuture is useful whenever you want to collect the result of an asynchronous task or multiple asynchronous tasks. The two most typical use cases are:
  • Parallel computing on multiple CPU cores
  • Remote method invocation through e.g. SOA or REST calls through a remote interface
In both of these cases, you want to eventually collect the computation output, and CompletableFuture gives you a means to specify what should be done with the output once it is received in a functional, declarative and reactive programming style which is well-suited for this situation.

All you have to do to enable promise-style programming for any asynchronous method is to provide the method with a promise object and fire its completion callbacks accordingly.

For the rest of this article, I will stick to the “execute a Thread with a Thread#sleep(…) invocation” just because it’s the most simple thing to demonstrate any asynchronous use case.

Pages: 1 2 3 4