July 26, 2015

CompletableFuture: Clean callbacks with Java 8’s Promises (part 3 of 4)


Pages: 1 2 3 4

CompletableFuture callbacks, advanced (continued)

Chaining callbacks

Now comes the interesting part which is to chain callbacks. This is what in a classic, pure callback-based implementation would quickly devolve to deeply nested callback hell.

The most basic implementation is to chain two fulfill callbacks, “pipeing” the result of the first one to the input of the second one:
promise.thenApply(it -> transform(it)).thenAccept(it -> print(it));
Now we see the difference of intermediary and terminal callbacks discussed earlier in action. Because the return value of the transform(…) method must be “piped” into the input value of the print(…) method, this method must return any value (not void). The CompletableFuture API in this case forces us to register the intermediary callback with #thenApply(…) and the terminal callback with #thenAccept(…).

But wait – it this any different from naïvely chain callbacks like this…?
promise.thenAccept(it -> { print(transform(it));} );
Yes, it is.
  • First of all, you would have to manually do exception handling, i.e. differ whether the inner or the outer function threw an exception, and react accordingly. In the example, there is no exception handling (through callback rejecting) at all, but this applies especially when registering a combined callback handler with handle(…) (intermediary) / whenComplete(…) (terminal).
  • Through the use of asynchronous callbacks which we covered very briefly (the methods ending on …Async(…)), we can make any part of the callback chain run asynchronously with complete control over Thread management.
Let’s not fall back into the old “callback” way of thinking.

Remember that inside the actual callback function, the control flow is no longer controlled by invoking #complete(…) / #completeExceptionally(…) on a promise object, but implicitly by the callback’s return value / by the exception it throws. More on that in the next section:

Callback chaining fall-through with eventual exception handling

It’s important to realize that the callback chain is traversed in sequence to find the next fulfill / reject handler. Thus, in order to make sure no exception thrown within the callback chain gets missed, it’s good practice to finalize a callback chain with either a rejection handler or a combined handler:
promise.<String> thenApply(it -> {throw new RuntimeException("Promise rejected");})
        .thenApply(it -> transform(it))
        .thenApply(it -> transform(it))
        .whenComplete((it, err) -> {
            if (it != null) {
                print(it);
            } else {
                logAccidentalException((CompletionException) err);
            }
        });

startThread(() -> {
    sleep(WAIT_MILIS);
    promise.complete("Future explicitly fulfilled");
});
In this example, none of the it -> transform(it) will get invoked. After the first callback threw the RuntimeException, it goes straight through to the “error” case of #whenComplete(…).

Note that in this example, promise rejection really happens implicitly in a callback handler, i.e. by throwing an exception, as mentioned in the previous section. However, in this case, the exception “piped” into the next rejection handler would not be the original business exception, but that business exception wrapped in a CompletionException:
private String logAccidentalException(CompletionException ex) {
    // do something with input, e.g. print it
    throw new RuntimeException(ex);
}
(Because of CompletableFuture API shortcomings, we still have to cast it when registering the callback with #whenComplete(…).)

Using callback chaining for exception handling

If you think about it, you could really use promises as a mini-DSL (domain specific language) for exception handling:
String output = CompletableFuture.supplyAsync(() -> {
    // do stuff
    return "Promise";
}).whenComplete((it, err) -> {
    if (it != null) {
        print(it);
    }
    else {
        log(err);
    }
}).get();
That’s pretty straightforward. If the method succeeded, you proceed the output in the if branch of #whenComplete(…); otherwise, you handle the exception in the else branch.

(I wouldn’t recommend using that in practice though as you should stay with Java’s globally accepted exception handling standards).

Using callback chaining to recover from exceptions

A useful application of callback chaining is the possibility to recover from an exception in a previous function call. Remember that when introducing the reject callback, we emphasized that the respective callback handler must throw an exception itself?
private String log(Throwable ex) {
    // do something with input, e.g. print it
    throw new RuntimeException(ex);
}
If an exception is thrown by a previous function call, the reject callback gets invoked, but any subsequent fulfill callback would be ignored.

In this example code, fireing an exceptional callback, log(it) gets called, but not the subsequent second print(it):
// 1a - build the task
final CompletableFuture<String> promise = new CompletableFuture<>();
// 1b - define task result processing
promise.exceptionally(it -> log(it)).thenAccept(it -> print(it));

// 2 - start the task
startThread(() -> {
    sleep(WAIT_MILIS);
    promise.completeExceptionally(new MyPromiseRejectedException("Promise rejected"));
});
However, by returning a proper value from a reject callback, we tell the chain mechanism that we have “recovered” from the exception, thus the subsequent fulfill callback would then be invoked.

Consider this implementation of fix(…):
private String fix(Throwable ex) {
    // do something with input, e.g. print it
    return "Recovered";
}
In this example code, fireing an exceptional callback, fix(it) gets called which returns a proper value again, triggering the subsequent call on the second print(it):
promise.exceptionally(it -> fix(it)).thenAccept(it -> print(it));

Dynamic callback chaining

If you need the power to dynamically define which promise to “pipe” into, e.g. based on the outcome of the previous promise, use #thenCompose(…) rather than #thenAccept(…) / #thenRun(…) / #thenApply(…):
// 1a - build the task
final CompletableFuture<String> promise1 = new CompletableFuture<>();
final CompletableFuture<String> promise2 = new CompletableFuture<>();
// 1b - define task result processing
promise1.thenAccept(it -> print(it)).thenCompose(it1 -> promise2);
promise2.thenAccept(it2 -> print(it2));

// 2 - start the task
startThread(() -> {
    sleep(WAIT_MILIS);
    promise1.complete("Future 1 explicitly fulfilled");
    promise2.complete("Future 2 explicitly fulfilled");
});
For #thenCompose(…), the function provided must thus return the next promise in the chain.

Pages: 1 2 3 4