PolarSPARC |
Java 8 CompletableFuture :: Part 2
Bhaskar S | 11/02/2018 |
In Part 1 of this series, we demonstrated some of the capabilities and nuances of CompletableFuture. We will continue our journey to illustrate the other features in CompletableFuture.
The following example demonstrates a pipeline with the ability to combine outputs from two asynchronous tasks that are independent of each other:
Executing the program from Listing.5 will generate the following output:
[1] [main] I'm Cool AND am Slick !!! [2] [main] I'm Smart AND am Nimble !!! [3] [ForkJoinPool.commonPool-worker-5] I'm Awesome AND am Fast !!! [4] [pool-1-thread-3] I'm Stunning AND am New !!! [5] [pool-2-thread-3] I'm Agile AND am Quick !!!
Re-running the program from Listing.5 few more times will generate the following output:
[1] [ForkJoinPool.commonPool-worker-3] I'm Cool AND am Slick !!! [2] [main] I'm Smart AND am Nimble !!! [3] [ForkJoinPool.commonPool-worker-3] I'm Awesome AND am Fast !!! java.util.concurrent.ExecutionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniAccept@1767e42a rejected from java.util.concurrent.ThreadPoolExecutor@1358721e[Shutting down, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1] at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at com.polarsparc.cf.CompletableFuture.Sample05.main(Sample05.java:71) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniAccept@1767e42a rejected from java.util.concurrent.ThreadPoolExecutor@1358721e[Shutting down, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355) at java.base/java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:568) at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:710) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:1186) at java.base/java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1208) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) [5] [pool-2-thread-3] I'm Agile AND am Quick !!!
Notice the excepion in Output.7. This happens because the executor was shutdown before all the task submissions could be completed.
The following are some of the concepts in the context of the code in Listing.5:
BiFunction<T, U, R> :: A functional interface defined in the java.util.function package that is implemented by a class whose method takes in two input values (of types T and U) and generates an output value of type R
thenCombine(CompletionStage<U>, BiFunction<T, U, R>) :: This method takes two arguments - a reference to CompletionStage<U> (basically an instance of CompletableFuture) that will return a value of type U when it completes execution and an instance of a function of type BiFunction<T, U, R>. The function (in the second argument) is executed only after the prior task (or stage) in the pipeline completes execution (returning a value of type T) and the task (wrapped in the first argument) complete execution (returning a value of type U). The function takes in the values T and U and returns a result of type R. The tasks are executed using the default fork-join thread pool. The method returns an instance of CompletionStage
thenCombineAsync(CompletionStage<U>, BiFunction<T, U, R>) :: This method is similar to the method thenCombine(CompletionStage<U>, BiFunction<T, U, R>), except that the specified BiFunction<T, U, R> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
thenCombineAsync(CompletionStage<U>, BiFunction<T, U, R>, Executor) :: This method is similar to the method thenCombine(CompletionStage<U>, BiFunction<T, U, R>), except that the specified BiFunction<T, U, R> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
When using a custom Executor, ensure the shutdown() method is invoked only after all the tasks have completed execution.
In the next example, we will demonstrate a pipeline with the ability to combine outputs from two asynchronous tasks where the next task in the chain is dependent on the result from the current task in the chain. In other words, the output from the current task is consumed by the next task in the chain as input, and returns an instance of CompletionStage that will generate the result (in the future) when that task completes execution.
This may seem mouthful and confusing - but we will break it down in the following section.
Executing the program from Listing.6 will generate the following output:
[1] [ForkJoinPool.commonPool-worker-5] I'm Cool & am SLICK !!! [2] [main] I'm Smart & am NIMBLE !!! [3] [ForkJoinPool.commonPool-worker-5] I'm Awesome & am FAST !!! [4] [pool-1-thread-3] I'm Awesome & am FAST !!!
The following are some of the concepts in the context of the code in Listing.6:
thenCompose(Function<T, CompletionStage<U>>) :: This method is defined on the interface CompletionStage and accepts an instance of a function of type Function<T, CompletionStage<U>>, which consumes a value (of type T) produced by the previous stage (or task) in the pipeline as the input argument to the function and returns an instance of CompletionStage<U>>. The *subtle* point here is that this callback method returns an instance of type CompletionStage<U>>. When this returned CompletionStage<U>> completes execution, it will return a value of type U.
thenComposeAsync(Function<T, CompletionStage<U>>) :: This method is similar to the method thenCompose(Function<T, CompletionStage<U>>), except that the specified Function<T, CompletionStage<U>> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
thenComposeAsync(Function<T, CompletionStage<U>>, Executor) :: This method is similar to the method thenComposeAsync(Function<T, CompletionStage<U>>) , except that the specified Function<T, CompletionStage<U>> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
On one hand, this may seem similar to the method thenApply(Function<T, U>), which takes an input value (of type T) from the current task in the pipeline and generates a result (of type U) immediately, when the specified Function<T, U> completes execution. This is similar to a map() operation on Streams.
The following picture illustrates how thenApply(Function) works:
On the other hand, the method thenCompose(Function<T, CompletionStage<U>>) takes an input value (of type T) from the current task in the pipeline and returns an instance of type CompletionStage<U>>. When this returned CompletionStage<U>> completes execution (in the future), that task will generate the desired result (of type U). This is similar to a flatmap() operation on Streams.
The following picture illustrates how thenCompose(Function) works:
Note, if we had used the method thenApply(Function<T, CompletionStage<U>>) instead of the method thenCompose(Function<T, CompletionStage<U>>), we would get a return value of type CompletionStage<CompletionStage<U>>. Instead it is being flattened by using the method thenCompose(Function<T, CompletionStage<U>>).
Moving on to the next example, we will demonstrate the case where two separate asynchronous tasks each produce a value, which can then be consumed by the next task in the chain as inputs to generate a totally different value.
Executing the program from Listing.7 will generate the following output:
[1] [main] I am Cool and am Slick !!! [2] [ForkJoinPool.commonPool-worker-3] I am Fast and am Nimble !!! [3] [pool-1-thread-3] I am Stunning and am Quick !!!
The following are some of the concepts in the context of the code in Listing.7:
BiConsumer<T, U> :: A functional interface defined in the java.util.function package that accepts two input values (of types T and U) and does not generate any output
thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>) :: This method is defined on the interface CompletionStage and accepts an instance of type CompletionStage<U> as the first argument (let us refer to it as CF) and an instance of type BiConsumer<T, U> as the second argument. The function instance (the second argument) consumes two input values - the value (of type T) produced by the previous stage (or task) in the pipeline and the value (of type U) produced by CF, but produces no output. This method returns an instance of CompletionStage
thenAcceptBothAsync(CompletionStage<U>, BiConsumer<T, U>) :: This method is similar to the method thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>), except that the specified BiConsumer<T, U> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
thenAcceptBothAsync(CompletionStage<U>, BiConsumer<T, U>, Executor) :: This method is similar to the method thenAcceptBothAsync(CompletionStage<U>, BiConsumer<T, U>) , except that the specified BiConsumer<T, U> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
The following table summarizes the methods we covered in this article from the CompletableFuture class:
Method | Usage Description |
---|---|
thenCombine(CompletionStage<U>, BiFunction<T, U, R>) | Takes an instance of type CompletionStage<U> (as the first argument) and an instance of type BiFunction<T, U, R> (as the second argument). The function in the second argument is executed once the prior task (or stage) in the pipeline as well as the task wrapped in the CompletionStage<U> (first argument) have completed execution. The function generates an output value of type R |
thenCombineAsync(CompletionStage<U>, BiFunction<T, U, R>) | Takes two input arguments of type CompletionStage<U> (first argument) and of type BiFunction<T, U, R> (second argument). The function in the second argument is executed asynchronously once the prior task (or stage) in the pipeline as well as the task wrapped in the first argument have completed execution. The function generates an output value of type R |
thenCompose(Function<T, CompletionStage<U>>) | Takes an instance of type Function<T, CompletionStage<U>> which consumes the value (of type T) produced by the previous stage (or task) in the pipeline as an input and returns an instance of type CompletionStage<U>>. We get an output value (of type U) when this returned CompletionStage<U>> completes execution |
thenComposeAsync(Function<T, CompletionStage<U>>) | Takes an instance of type Function<T, CompletionStage<U>> which is executed asynchronously. The function consumes the value (of type T) produced by the previous stage (or task) in the pipeline as an input and returns an instance of type CompletionStage<U>>. We get an output value (of type U) when this returned CompletionStage<U>> completes execution |
thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>) | Takes two arguments - an instance of type CompletionStage<U> and an instance of type BiConsumer<T, U>. The function in the second argument is executed once the prior task (or stage) in the pipeline as well as the task wrapped in the first argument have completed execution. No result is generated |
thenAcceptBothAsync(CompletionStage<U>, BiConsumer<T, U>) | Takes an instance of type CompletionStage<U> and an instance of type BiConsumer<T, U>. The function in the second argument is executed asynchronously once the prior task (or stage) in the pipeline as well as the task wrapped in the first argument have completed execution. No result is generated |
More to come in Part 3 of this article ...