July 26, 2015

CompletableFuture: Clean callbacks with Java 8’s Promises (part 2 of 4)


Pages: 1 2 3 4

Promises basics with CompletableFuture

Fulfill a promise

This is covered in the first introductory example in the section above.
  • We explicitly fulfill a promise by calling #complete(…).
  • The respective listener is registered with #thenAccept(…) (takes the promise outcome as an argument) / #thenRun(…) (takes a Runnable as the argument, no access to the promise outcome).

Reject a promise

Rejecting a promise means signaling an error occurrence, i.e. the future did not complete normally but throws an exception. As with return values, you wouldn’t let your actual task throw the exception but instead, you inform the CompletableFuture event listener that an exception occurred:
// 1a - build the task
final CompletableFuture<String> promise = new CompletableFuture<>();
// 1b - define task result processing
promise.exceptionally(it -> log(it));

// 2 - start the task
startThread(() -> {
    sleep(WAIT_MILIS);
    promise.completeExceptionally(new PromiseRejectedException("Promise rejected"));
    
    try {
        promise.get();
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    } catch (ExecutionException ex) {
        Assert.assertEquals(PromiseTestUtil.PromiseRejectedException.class, ex.getCause().getClass());
    }
});
  • We explicitly reject a promise by calling #completeExceptionally(…).
  • The respective listener is registered with #exceptionally(…).
Note that the exception listener handler throws an exception (here, it re-throws the original exception) once its job is done.
private String log(Throwable ex) {
    // do something with input, e.g. print it
    throw new RuntimeException(ex);
}
This is important. We will come back to this presently.

Fulfill and reject a promise?

As promises replace callbacks which signal successful or exceptional completion of a function, they must be fulfilled at most once. As I mentioned earlier, this is implicitly built-in into CompletableFuture. You can fulfill or reject a promise only once; a fulfilled promise cannot be rejected and vice versa; any additional call is ignored and the callback is not invoked. The API provides access to the current status of a promise:
  • #isDone() returns true if the promise has been either completed normally or exceptionally already.
  • #isCompletedExceptionally() returns true if the promise has been completed exceptionally already.
Moreover, the return value of the event fire methods indicate the original status of the promise as well:
  • When fulfilling a promise by calling #complete(…), the return value is true if the promise has already been “done”, thus no listener will be invoked.
  • When fulfilling a promise by calling #completeExceptionally(…), the return value is true if the promise has already been “done”, thus no listener will be invoked.

However, note that if you specify a promise to have multiple callbacks like this:
promise.thenAccept(it -> print(it));
promise.thenAccept(it -> print(it));
you essentiall say: If this promise gets fulfilled, fire both these callbacks. This is perfectly valid. Fulfilling a promise will then invoke both callbacks, but any subsequent fulfilling will not invoke any callbacks.

Having said that, Java wouldn’t be Java if there wasn’t any hackish way to undermine this mechanism. There actually is:
  • #obtrudeValue(…) sets or re-sets the value retrived by #get().
  • #obtrudeException(…) sets or re-sets #get() to throw the exception provided.
Note that neither of those methods does fire any callbacks. They really just change the outcome of calling #get(). Because they mess with about everything promises are built for, I strongly advice you to never use these methods at all.

CompletableFuture callbacks flavors

If you ventured into the CompletableFuture API already, you may have noticed that some of the myriad of methods defined for the class really are different flavors of the basic methods we already covered here. I’d like to call them flavors rather than types, because you can intermix them, resulting in a vast amount of different variations.

Intermediary and terminal callbacks

We will later explore how to chain or “pipe” callbacks i.e. use the return value of one callback as the input value of another one. However, so far we only used terminal callbacks, i.e. methods which do not return a value (void) (these implement the Consumer functional interface). These are not eligible for chaining.

Of course you must define a callback function which returns a value of any type in order to chain / pipe it to another callback. That’s what intermediary callbacks are for (these implement the Function functional interface).

There’s an intermediary equivalent for these CompletableFuture methods we covered already:

  intermediary terminal terminal (Runnable)
Construct from task static supplyAsync   static runAsync
complete listener thenApply thenAccept thenRun
completeExceptionally listener exceptionally    

(Excerpt from the complete CompletableFuture callback API overview on the last page.)

Yes, unfortunately, the method names of terminal / intermediary callback registrators differ, and their resemblance is not obvious nor intuitive.

Thread-independent callbacks

Most callbacks offer an “async” flavor as well; these are the methods ending with …Async(…). They will run in a new thread rather than re-using the thread from a previous callback. The somewhat intricate details are discussed in this blog post. We’ll come back to this when we cover callback chaining.

Callbacks with explicit Executor

Asynchronous-flavored callbacks optionally take an additional Executor argument which will spawn the callback thread if provided; otherwise, according to the API, ForkJoinPool.commonPool() is used.

CompletableFuture auxiliary API

As we’re about to cover the entire CompletableFuture callback API, I’d like to briefly present some additional methods of CompletableFuture as well:
  • #get(…) blocks the thread and eventually retrieves the return value of the promise. Throws InterruptedException (threading-related) and ExecutionException (when the promise was rejected) as checked exceptions. You should always prefer this method over #join() if you’re interested in the return value as it forces you to check for rejection as well.
  • #getNow(…) tries to retrieve the return value of the promise immediately, or returns the provided default value; never blocks.
  • #join(): same as #get(), but throws unchecked exceptions only.

CompletableFuture callbacks, advanced

Combining callbacks

Of course, we can register both a fulfill callback as well as a reject callback. One way to do so is by registering them individually like this:
promise.thenAccept(it -> print(it));
promise.exceptionally(it -> log(it));

Note that both methods return a modified CompletableFuture object; yet, we need to register both on the same original object, thus do not use builder-style method chaining here.

An equivalent way is to provide a combined callback with #whenComplete(…):
promise.whenComplete((it, err) -> {
    if (it != null) {
        print(it);
    }
    else {
        log(err);
    }
})
Note that it’s important to register a rejected callback whenever you expect an exception to happen; because the “normal” fulfill callback would not be triggered when a promise is rejected and thus you might “miss” completion information.

We can add to our list of CompletableFuture methods:

  intermediary terminal terminal (Runnable)
Combined complete and completeExceptionally listener handle whenComplete  

Multiple callbacks

Just as you can register both a fulfill and a reject callback, you can actually register an arbitrary number of both fulfill and reject callbacks!

For instance, defining these two callbacks:
promise.thenAccept(it -> print(it));
promise.thenAccept(it -> print(it));
would fire it -> print(it) twice when the promise is fulfilled. Note that this is not the same as chaining callbacks as covered on the next page.

Pages: 1 2 3 4