July 26, 2015

CompletableFuture: Clean callbacks with Java 8’s Promises (part 4 of 4)


Pages: 1 2 3 4

CompletableFuture callbacks, advanced (continued)

Collecting and combining results

A typical use case for asynchronous computation is to have multiple functions executed in parallel and then collecting their results once they’re all finished. For instance, you might run several remote method invocations in parallel to receive different entities, and then combine the result into one entity collection.

Fortunately, CompletableFuture comes with an API which specifically covers these cases.

Wait for all promises to finish, and collect result

This is the implementation of an “AND”-combination of several promises. It can be achieved by combining several callbacks with the #allOf(…) method:
// 1a - build the task
final CompletableFuture<String> promise1 = new CompletableFuture<>();
final CompletableFuture<String> promise2 = new CompletableFuture<>();
// 1b - define task result processing
final CompletableFuture<Void> promiseCombined = 
        // Note that "it" is of type Void.
        CompletableFuture.allOf(promise1, promise2).thenAccept(it -> inform(it));

// 2 - start the task
startThread(() -> {
    sleep(WAIT_MILIS);
    promise1.complete("Future 1 explicitly fulfilled");
    promise2.complete("Future 2 explicitly fulfilled");
});
Well, kind of. Unfortunately, the resulting combined promise is of type <Void>, leading to the ridiculous need to create a listener which takes a single parameter of type Void. Typically, we would much rather like to have a resulting promise working with the collected results of the combined promises, i.e. of Type <List<…>>.

This blog post (go take a look; they cover CompletableFuture in much more detail) presents a solution for that which I have adapted here.

Now, we can use the new #allOf(…) to collect the results of multiple promises:
final CompletableFuture<Void> promiseCombined = 
    CompletableFutureUtil.allOf(promise1, promise2).
        thenAccept(all -> all.stream().forEach(it -> print(it)));

Wait for both promises to finish, and collect result

If you just want to wait for exactly two promises, there’s an alternate syntax to combine the second with the first more explicitly through #thenCombine (…):
final CompletableFuture<String> promise1 = new CompletableFuture<>();
final CompletableFuture<String> promise2 = new CompletableFuture<>();

promise1.thenCombine(promise2, (v1, v2) -> combine(v1, v2)).thenAccept(it -> print(it));
There are actually “combining” variations of all three main callback methods:
  • To AND-combine two #thenApply(…) (intermediary) into one operation, use #thenCombine(…)
  • To AND-combine two #thenAccept(…) (terminal) into one operation, use #thenAcceptBoth(…)
  • To AND-combine two #thenRun (…) (terminal) into one operation, use #runAfterBoth(…)
Again, you are apparently not supposed to find any inherent logic in that naming scheme…

Wait for the first promise to finish

The opposite use case of waiting for all promises to finish of course is to wait just for the first one – this would then be the “OR”-combination. As one might expect, there’s an API for that as well: it’s the #anyOf(…) method.
// 1a - build the task
final CompletableFuture<String> promise1 = new CompletableFuture<>();
final CompletableFuture<String> promise2 = new CompletableFuture<>();
// 1b - define task result processing
final CompletableFuture<Void> promiseCombined = 
        // Note that "it" is of type Object.
        CompletableFuture.anyOf(promise1, promise2).thenAccept((it) -> print(it));

// 2 - start the task
startThread(() -> {
    sleep(WAIT_MILIS);
    promise1.complete("Future 1 explicitly fulfilled");
    promise2.complete("Future 2 explicitly fulfilled");
});
Unfortunately, this method has a drawback as well (sigh): Unfortunately, the resulting combined promise is of type <?>, thus of Type Object. However, I think that typically, the result type of the combined promises would match and then it would be sensible for the combined promise to be of that very type. Hence, I created another helper method which just stupidly casts the combined promise.

This method can then be used without explicit casting:
final CompletableFuture<Void> promiseCombined = 
    CompletableFutureUtil.anyOf(promise1, promise2).thenAccept((it) -> print(it));

Wait for the first out of two promises to finish

Again, there is a special syntax for the case where there are exactly two promises you want to combine. Consider this example of #applyToEither(…):
final CompletableFuture<String> promise1 = new CompletableFuture<>();
final CompletableFuture<String> promise2 = new CompletableFuture<>();

promise1.applyToEither(promise2, it -> transform(it)).thenAccept(it -> print(it));
Yes, this is actually the counterpart of #thenCombine (…), with the callback not applied to the return value of both promises, but only to the one which returned first.

Of course, there are “combining” variations of all three main callback methods again:
  • To OR-combine two #thenApply(…) (intermediary) into one operation, use #applyToEither(…)
  • To OR-combine two #thenAccept(…) (terminal) into one operation, use #acceptEither(…)
  • To OR-combine two #thenRun (…) (terminal) into one operation, use #runAfterEither(…)

CompletableFuture API overview

For your reference, here’s a brief overview of the CompletableFuture methods covered in this article. I’ve arranged them into the three groups of intermediary, terminal and terminal (with Runnable parameter) methods. Note that only the method names are used here; parameters are omitted.
  • A (asynchronous) means that there’s an additional method starting with the same name, but ending with …Async for asynchronous callback invocation.
  • E (Executor) means that there’s an additional asynchronous (A) method with an additional Executor parameter for explicit Executor assignment.
  intermediary terminal terminal (Runnable) A E
Construct from task static supplyAsync   static runAsync   X
complete listener thenApply thenAccept thenRun X X
completeExceptionally listener exceptionally        
Combined complete and completeExceptionally listener handle whenComplete   X X
Dynamic callback chaining thenCompose     X X
AND all combining static allOf        
AND both combining thenCombine thenAcceptBoth runAfterBoth X X
OR any combining static anyOf        
OR either combining applyToEither acceptEither runAfterEither X X

Conclusion

As I wrote in the introduction, I am sure promises are a great addition to the Java language and I’d love to involve them in a more complex application when given the opportunity. They really help writing clean, functional, reactive code which is especially useful in a multi-threaded, asynchronously run environment. They are just a great concept!

On the other hand, their current implementation in Java 8 leaves quite much to be desired, at least in my opinion. If you thought that the Lambda API is a mess (and you have every right to do so), CompletableFuture takes this to a whole new level. The API is bloated, unintuitive and some features are just very poorly implemented.

This really is a shame because instead of helping us to properly apply the somewhat complex concept of promises, it further obscures it, depriving it of its potential as a straightforward threading tool. I cannot understand why this highly complex API is not at least accompanied by an official tutorial!

Nevertheless, I hope that this article helped you to better understand the concept of promises and their application in Java 8 through the CompletableFuture API. By all means, if anything is unclear or if you miss any important information, please let me know in the comments below.

You may also want to take a look at the GitHub repository which contains the full source code of these code samples as JUnit tests.

Update September 6, 2015: Meanwhile, I’ve created a small wrapper API for CompletableFuture which is much more concise, consistent and contains fixes for the flaws discussed in this article. It's part of the Collection API wrapper library LambdaOmega. Check out the accompanying blog post here or visit its GitHub page. Version 0.1 is now RELEASED!



Pages: 1 2 3 4

No comments:

Post a Comment