PolarSPARC |
Java 8 CompletableFuture :: Part 1
Bhaskar S | 10/26/2018 |
Straight-through business process(es) typically involve multiple step(s) and todays Enterprises have many of them. Some of those step(s) could be time consuming and hence performed in parallel (asynchronously) and some in sequential (synchronous) fashion. In other words, a straight-through business process can be thought as a pipeline of task(s), some of which could be executed in parallel.
In Java, one could perform those asynchronous step(s) using Threads. However, one needs to carefully plan out the orchestration of the various step(s) in the business process without blocking and locking for optimal performance, which could be complex, error-prone, and difficult to reason about.
What if Java provided an out-of-the-box capability to chain a series of task(s), with some task(s) executing in parallel, without one having to write complex multi-threaded code ???
Please welcome CompletableFuture from Java 8!!!
NOTE: that we are referring to the tasks (or steps) within a single JVM and not distributed computing.
Lets jump right into some examples to illustrate the power and intricacies of CompletableFuture.
The following is one of the simplest examples that uses CompletableFuture:
Executing the program from Listing.1 will generate the following output:
[ForkJoinPool.commonPool-worker-3] I am Cool [ForkJoinPool.commonPool-worker-3] Am Awesome [pool-1-thread-1] And am Smart
The following are some of the concepts in the context of the code in Listing.1:
Future :: An interface that represents the result of an asynchronous task which may complete in the future. It provides the get() method to wait for the completion of the asynchronous task and return some result
CompletionStage :: An interface that represents the completion state of a task (or stage) in the chain (or pipeline) of tasks (may be asynchronous computations), that upon completion may trigger the next task (or stage) in the chain
CompletableFuture :: An implementation that supports a chain (or pipeline) of tasks (may be synchronous or asynchronous computations) that progresses from one task to the next, on the completion of the prior task(s). This class implements both the Future interface and the CompletionStage interface. By default, this class leverages the default fork-join thread pool instantiated by the JVM for executing task(s). It provides the get() method to wait for the completion of the asynchronous task and return some result
Runnable :: An interface that should be implemented by a class for it to be executed in a Thread
CompletableFuture.runAsync(Runnable) :: This method takes an instance of Runnable that is asynchronously executed as a task using the default fork-join thread pool. The method returns an instance of CompletableFuture
Supplier<T> :: A functional interface defined in the java.util.function package that is implemented by a class that generates a value of type T
CompletableFuture.supplyAsync(Supplier<T>) :: This method takes an instance of type Supplier<T> that will supply a value of type T when the task completes. The task is executed asynchronously using the default fork-join thread pool. This method returns an instance of CompletableFuture
Executor :: An interface that allows for execution of Runnable tasks
ExecutorService :: An interface that allows for asynchronous execution of tasks. It extends the interface Executor
Executors :: A factory class used in the creation of various types of ExecutorService implementations
Executors.newSingleThreadExecutor() :: Allows for the creation of an ExecutorService implementation that uses a single thread in the pool
CompletableFuture.supplyAsync(Supplier<T>, Executor) :: This method takes two arguments - an instance of type Supplier<T> and an instance of type Executor. When the task completes execution, it returns a value of type T. The task is executed asynchronously using the specified custom executor. This method returns an instance of CompletableFuture
As seen from Listing.1 above, there are two ways to initiate an asynchronous operation - using runAsync() or using supplyAsync().
In the next example, we will demonstrate the case where an asynchronous task produces some value, which can then be consumed by the next task in the chain.
Executing the program from Listing.2 will generate the following output:
[1] [main] I am Cool and am also Awesome [2] [main] I am New and am also Smart [3] [ForkJoinPool.commonPool-worker-3] I am Fast and am also Elegant [4] [pool-2-thread-2] I am Slick and am also Nimble
Re-running the program from Listing.2 once more will generate the following output:
[1] [ForkJoinPool.commonPool-worker-3] I am Cool and am also Awesome [2] [main] I am New and am also Smart [3] [ForkJoinPool.commonPool-worker-3] I am Fast and am also Elegant [4] [pool-2-thread-2] I am Slick and am also Nimble
Notice the change in the thread name of the first line between Output.2 and Output.3. We will explain that shortly in the following section.
The following are some of the concepts in the context of the code in Listing.2:
Consumer<T> :: A functional interface defined in the java.util.function package that accepts an input value of type T and does not generate any result
Executors.newFixedThreadPool(int) :: Allows for the creation of an ExecutorService implementation that uses a fixed number of threads in the pool
thenAccept(Consumer<T>) :: This method is defined on the interface CompletionStage and accepts an instance of type Consumer<T>, which consumes a value (of type T) produced by the previous stage (or task). This method is triggered upon the completion of the previous stage (or task). Think of this as a callback function for the previous task to invoke upon completion. The execution of the task can happen in one of the two ways - EITHER executed by the same thread as the prior task (or stage) from the default fork-join thread pool if the prior task has not yet completed before this task OR executed by the main thread (the calling thread) if the prior task has completed execution before this task. This is the reason why we see the difference between Output.2 and Output.3. This method returns an instance of CompletionStage
thenAcceptAsync(Consumer<T>) :: This method is similar to the method thenAccept(Consumer<T>), except that the specified Consumer<T> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
thenAcceptAsync(Consumer<T>, Executor) :: This method is similar to the method thenAcceptAsync(Consumer<T>), except that the specified Consumer <T> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
The default fork-join thread pool is used in other cases, such as, the parallel Streams, etc., and is hard to
customize since it is created by the JVM. It is therefore better to create and use a custom Executor.
Moving on to the next example, we will demonstrate the case where an asynchronous task produces some value, which can then be consumed by the next task in the chain as input and then generate a totally different value. This is similar to a map() operation on Streams.
Executing the program from Listing.3 will generate the following output:
[1] [ForkJoinPool.commonPool-worker-3] I am Cool [1] I am Cool and AWESOME !!! [2] [main] I am New [2] I am New and SMART !!! [3] [ForkJoinPool.commonPool-worker-3] I am Fast [3] I am Fast and ELEGANT !!! [4] [pool-2-thread-1] I am Slick [4] I am Slick and NIMBLE !!!
The following are some of the concepts in the context of the code in Listing.3:
Function<T, U> :: A functional interface defined in the java.util.function package that accepts an input value of type T and generates an output result of type U. This as similar to the map() operation on a Stream
thenApply(Function<T, U>) :: This method is defined on the interface CompletionStage and accepts an instance of type Function<T, U>, which consumes the value (of type T) produced by the previous stage (or task) and generates an output value of type U. This method is triggered upon completion of the previous stage (or task). Think of this as a callback function for the previous task to invoke upon completion. The execution of the task can happen in one of the two ways - EITHER executed by the same thread as the prior task (or stage) from the default fork-join thread pool if the prior task has not yet completed before this task OR executed by the main thread (the calling thread) if the prior task has completed execution before this task. This method returns an instance of CompletionStage
thenApplyAsync(Function<T, U>) :: This method is similar to the method thenApply(Function<T, U>), except that the specified Function<T, U> function is executed asynchronously using the default fork-join thread pool. The method returns an instance of CompletionStage
thenApplyAsync(Function<T, U>, Executor) :: This method is similar to the method thenApplyAsync(Function<T, U>), except that the specified Function<T, U> function is executed asynchronously using the specified custom executor. The method returns an instance of CompletionStage
In the next example, we will demonstrate the case where an asynchronous task produces some value, that is then consumed by a second task in the chain to produce some output, which is finally consumed by a third task in the chain.
Executing the program from Listing.4 will generate the following output:
[1] I'm Cool and am Super AWESOME !!! [2] I'm Awesome and am Super COOL !!!
The first block of code in Listing.4 shows how one can chain the tasks explicitly, while the second block shows the same code in the fluent style.
The following table summarizes the methods we covered in this article from the CompletableFuture class:
Method | Usage Description |
---|---|
runAsync(Runnable) | Uses an instance of Runnable to start the pipeline. Does not generate any output value |
supplyAsync(Supplier<T>) | Uses an instance of Supplier<T> to start the pipeline. Generates an output value of type T |
thenAccept(Consumer<T>) | Uses an instance of Consumer<T> to accept a value of type T from the prior task in the pipeline. Does not generate any output value |
thenAcceptAsync(Consumer<T>) | Uses an instance of Consumer<T> to accept a value of type T from the prior task in the pipeline in an asynchronous fashion. Does not generate any output value |
thenApply(Function<T, U>) | Uses an instance of Function<T, U> to accept a value of type T from the prior task in the pipeline. Generates an output value of type U |
thenApplyAsync(Function<T, U>) | Uses an instance of Function<T, U> to accept a value of type T from the prior task in the pipeline in an asynchronous fashion. Generates an output value of type U |
More to come in Part 2 of this article ...