PolarSPARC |
Java 8 CompletableFuture :: Part 3
Bhaskar S | 11/04/2018 |
In Part 1 and Part 2 of this series, we demonstrated some of the capabilities and nuances of CompletableFuture. In this part, we will illustrate few more features, including exception handling in CompletableFuture.
Often times there are scenarios when either of the two outcomes is enough to proceed to the next stage in a pipeline.
The following example demonstrates a pipeline with the ability to choose an output from either of the two asynchronous tasks based on which of the two completes first and consume that output:
Executing the program from Listing.8 will generate the following output:
[1] [ForkJoinPool.commonPool-worker-3] I am Awesome and am NIMBLE !!! [2] [ForkJoinPool.commonPool-worker-9] I am Fast and am SLICK !!! [3] [pool-1-thread-1] I am Elegant and am NEW !!!
Re-running the program from Listing.8 will generate the following output:
[1] [ForkJoinPool.commonPool-worker-5] I am Cool and am NIMBLE !!! [2] [ForkJoinPool.commonPool-worker-9] I am Fast and am SLICK !!! [3] [pool-1-thread-1] I am Quick and am NEW !!!
From Listing.8, notice that we introduce random delays (upto 500ms) in the tasks that make up the pipeline. This will allow for one of the tasks to finish executing first.
The following are some of the concepts in the context of the code in Listing.8:
acceptEither(CompletionStage<T>, Consumer<T>) :: This method takes two arguments - a reference to the type CompletionStage<T> and an instance of type Consumer<T>. The function in the second argument is executed when either the prior task (or stage) in the pipeline completes or the task wrapped in the first argument completes execution. The tasks are executed using the default fork-join thread pool. The method returns an instance of CompletionStage
acceptEitherAsync(CompletionStage<T>, Consumer<T>) :: This method is similar to the method acceptEither(CompletionStage<T>, Consumer<T>), except that the specified Consumer<T> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
acceptEitherAsync(CompletionStage<T>, Consumer<T>, Executor) :: This method is similar to the method acceptEitherAsync(CompletionStage<T>, Consumer<T>), except that the specified Consumer<T> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
In the next example, we will demonstrate a pipeline with the ability to choose an output from either of the two asynchronous tasks based on which of the two completes first and apply a function on that output to produce a new result.
Executing the program from Listing.9 will generate the following output:
[1] [ForkJoinPool.commonPool-worker-5] I am Bold and am Cool !!! [2] [ForkJoinPool.commonPool-worker-9] I am Fast and am New !!! [2] [pool-1-thread-2] I am Practical and am Radical !!!
Re-running the program from Listing.9 will generate the following output:
[1] [ForkJoinPool.commonPool-worker-3] I am Awesome and am Cool !!! [2] [ForkJoinPool.commonPool-worker-5] I am Elegant and am New !!! [3] [pool-1-thread-2] I am Practical and am Radical !!!
The following are some of the concepts in the context of the code in Listing.9:
applyToEither(CompletionStage<T>, Function<T, U>) :: This method takes two arguments - first argument is a reference to the type CompletionStage<T> and the second argument is an instance of type Function<T, U>. The function in the second argument is executed by consuming a value (of type T) produced by either the prior task (or stage) in the pipeline or the task wrapped in the first argument and generating an output value of type U. The tasks are executed using the default fork-join thread pool. The method returns an instance of CompletionStage
applyToEitherAsync(CompletionStage<T>, Function<T, U>) :: This method is similar to the method applyToEither(CompletionStage<T>, Function<T, U>), except that the specified Function<T, U> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
applyToEitherAsync(CompletionStage<T>, Function<T, U>, Executor) :: This method is similar to the method applyToEitherAsync(CompletionStage<T>, Function<T, U>) , except that the specified Function<T, U> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
Business processes often experience exceptions in one or more stages in the pipeline. In the next example below, we demonstrate how to handle exception cases. The pipeline consists of two asynchronous tasks generating two random numbers, which is consumed by the next task in the pipeline to generate a modulus value. If the modulus value is zero, we thrown an exception.
Executing the program from Listing.10 will generate the following output:
[1] [pool-1-thread-4] Magic number is 6 [2] [pool-2-thread-4] Magic number is 10 [3] [pool-3-thread-4] Magic number is 7 [4] [main] Magic number is 11
Re-running the program from Listing.10 a few times will generate the following output:
[1] EXCEPTION:: java.lang.RuntimeException: n1 = 99, n2 = 1 => Invalid combination [2] [pool-2-thread-4] Magic number is 2 [3] [pool-3-thread-4] Magic number is 1 [4] [main] Magic number is 8
When a task (or stage) in a pipeline throws an exception, the subsequent task(s) downstream from the current task will be skipped and the exception is thrown when the get() method is invoked on the CompletableFuture. This is evident from the Output.15 above.
Again, re-running the program from Listing.10 a few times will generate the following output:
[1] [pool-1-thread-4] Magic number is 3 [2] ERROR:: java.lang.RuntimeException: n1 = 962, n2 = 37 => Invalid combination [2] [pool-2-thread-4] Magic number is -1 [3] [pool-3-thread-4] Magic number is 39 [4] [main] Magic number is 31
Once again, re-running the program from Listing.10 a few times will generate the following output:
[1] [pool-1-thread-4] Magic number is 12 [2] [pool-2-thread-4] Magic number is 19 [3] [pool-3-thread-4] Magic number is 43 [4] ERROR:: java.lang.RuntimeException: n1 = 27, n2 = 9 => Invalid combination [4] EXCEPTION:: java.lang.RuntimeException: n1 = 27, n2 = 9 => Invalid combination
The following are some of the concepts in the context of the code in Listing.10:
exceptionally(Function<Throwable, T>) :: This method is defined on the interface CompletionStage and accepts an instance of type Function<Throwable, T>. If the previous task (or stage) completes normally, then this callback also completes normally and passes along the value from the previous task (of type T). On the other hand, if the previous (or stage) encounters any exception, then this callback executes the specified function with the exception as the function argument and returns a different value (of the same type T) thereby effectively swallowing the thrown exception. This method returns an instance of CompletionStage
handle(BiFunction<T, Throwable, U>) :: This method is defined on the interface CompletionStage and accepts an instance of type BiFunction<T, Throwable, U>. If the previous task (or stage) completes normally, this callback will execute the specified function with the value from the previous task (of type T) as the first argument and a null as the second argument (since there is no exception). However, if the previous task (or stage) encounters any exception, then this callback executes the specified function with a null as the first argument and the exception as the second argument. The specified function will return a value of type U on execution. This method allows one to swallow exceptions throw by any task in the pipeline. This method returns an instance of CompletionStage
whenComplete(BiConsumer<T, Throwable>) :: This method is defined on the interface CompletionStage and accepts an instance of type BiConsumer<T, Throwable>. If the previous task (or stage) completes normally, this callback will execute the specified function with the value from the previous task (of type T) as the first argument and a null as the second argument (since there is no exception). However, if the previous task (or stage) encounters any exception, then this callback executes the specified function with a null as the first argument and the exception as the second argument. The specifed function will not return any results. This method will propagate any exceptions thrown by prior tasks in the pipeline. This method returns an instance of CompletionStage
Just like the other methods we have seen so far on the interface CompletionStage, both the methods handle(BiFunction<T, Throwable, U>) and whenComplete(BiConsumer<T, Throwable>) have their async counterparts, with and without the custom executor.
The following table summarizes the methods we covered in this article from the CompletableFuture class:
Method | Usage Description |
---|---|
exceptionally(Function<Throwable, T>) | Takes an instance of Function<Throwable, T>. The specified function is executed only when the prior task (or stage) encounters an exception. For normal operation, this method transparently passes along the value from the previous task in the pipeline. This method allows one to swallow exceptions |
handle(BiFunction<T, Throwable, U>) | Takes an instance of BiFunction<T, Throwable, U>. The specified function in called irrespective of whether or not an exception occurs. One of the two arguments to the specified function will be null based on whether the previous task (or stage) encounters some exception. The specified function must return a value. This method allows one to swallow exceptions |
whenComplete(BiConsumer<T, Throwable>) | Takes an instance of BiConsumer<T, Throwable>. The specified function in called irrespective of whether or not an exception occurs. One of the two arguments to the specified function will be null based on whether the previous task (or stage) encounters some exception. This method will propagate any thrown exceptions |
More to come in Part 4 of this article ...