Tascalate Concurrent library version 0.5.2 is released and available in the Maven Central Repository.
The library was created to overcome numerous shortcomings of the standard (and the only) implementation of the CompletionStage interface shipped with Java8 — CompletableFuture.
First and foremost, the library provides implementation of the CompletionStage that supports long-running blocking tasks (typically, I/O bound) – unlike Java 8 built-in implementation, CompletableFuture, that is primarily supports computational tasks.
Why a CompletableFuture is not enough?
There are several shortcomings associated with the CompletableFuture class implementation that complicate its usage for blocking tasks:
CompletableFuture.cancel()
method does not interrupt underlying thread; it merely puts future to exceptionally completed state. So if you use any blocking calls inside functions passed tothenApplyAsync
/thenAcceptAsync
/ etc these functions will run till the end and never will be interrupted. Please see CompletableFuture can’t be interrupted by Tomasz Nurkiewicz.- By default, all
*Async
composition methods useForkJoinPool.commonPool()
(see here) unless explicit Executor is specified. This thread pool shared between all CompletableFuture-s, all parallel streams and all applications deployed on the same JVM. This hard-coded, unconfigurable thread pool is completely outside of our control, hard to monitor and scale. Therefore you should always specify your ownExecutor
. - Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several CompletionStage-s.
CompletableFuture.allOf
/CompletableFuture.anyOf
methods accept only CompletableFuture as arguments; you have no mechanism to combine arbitrary CompletionStage-s without converting them to CompletableFuture first. Also, the return type of the aforementionedCompletableFuture.allOf
is declared asCompletableFuture
– hence you are unable to extract conveniently individual results of the each future supplied.CompletableFuture.anyOf
is even worse in this regard; for more details please read on here: CompletableFuture in Action (see Shortcomings) by Tomasz Nurkiewicz
How to use?
Add Maven dependency:
<dependency> <groupId>net.tascalate.concurrent</groupId> <artifactId>net.tascalate.concurrent.lib</artifactId> <version>0.5.2</version> </dependency>
What is inside?
1. Promise interface
The interface net.tascalate.concurrent.Promise
may be best described by the formula:
Promise == CompletionStage + Future
I.e., it combines both blocking Future’s API, including cancel(boolean mayInterruptIfRunning)
method, AND composition capabilities of CompletionStage’s API. Importantly, all composition methods of CompletionStage API (thenAccept
, thenCombine
, whenComplete
etc.) are re-declared to return Promise
as well.
You may notice, that Java8 CompletableFuture implements both CompletionStage AND Future interfaces as well.
2. CompletableTask
This is why this project was ever started. net.tascalate.concurrent.CompletableTask
is the implementation of the net.tascalate.concurrent.Promise
API for long-running blocking tasks. There are several options to create a CompletableTask
:
CompletableTask.runAsync(Runnable runnable, Executor executor)
You may submit Runnable to the Executor:Promise<Void> p = CompletableTask.runAsync( this::someIoBoundMethod, myExecutor );
You may notice similarities with
CompletableFuture.runAsync(Runnable runnable, Executor executor)
method.CompletableTask.supplyAsync(Supplier<U> supplier, Executor executor)
Alternatively, you may submit Supplier to the Executor:
Promise<SomeValue> p = CompletableTask.supplyAsync(() -> { return blockingCalculationOfSomeValue(); }, myExecutor);
Again, you can notice direct analogy with
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
methodCompletableTask.asyncOn(Executor executor)
This unit operation returns a resolved no-valuePromise
that is “bound” to the specified executor. I.e. any function passed to composition methods ofPromise
(likethenApplyAsync
/thenAcceptAsync
/whenCompleteAsync
etc.) will be executed using this executor unless executor is overridden via explicit composition method parameter. Moreover, any recursively nested composition calls will use the same executor, if it’s not redefined via explicit composition method parameter:CompletableTask .asyncOn(myExecutor) .thenApplyAsync(myValueGenerator) .thenAcceptAsync(myConsumer) .thenRunAsync(myAction);
All of
myValueGenerator
,myConsumer
,myActtion
will be executed usingmyExecutor
.CompletableTask.complete(T value, Executor executor)
Same as above, but the starting point is a resolvedPromise
with the specified value:CompletableTask .complete("Hello!", myExecutor) .thenApplyAsync(myMapper) .thenApplyAsync(myTransformer) .thenAcceptAsync(myConsumer) .thenRunAsync(myAction);
All of
myMapper
,myTransformer
,myConsumer
,myActtion
will be executed usingmyExecutor
.
Most importantly, all composed promises support true cancellation (incl. interrupting thread) for the functions supplied as arguments:
Promise<?> p1 = CompletableTask .asyncOn(myExecutor) .thenApplyAsync(myValueGenerator) .thenAcceptAsync(myConsumer); Promise<?> p2 = p1.thenRunAsync(myAction); ... p1.cancel(true);
In the example above myConsumer
will be interrupted if already in progress. Both p1
and p2
will be resolved faulty: p1
with a CancellationException and p2
with a CompletionException.
It important to mention, that CompletableTask
supports interrupting execution thread, but the actual behavior depends on the concrete Executor implementation: for example, the ThreadPoolExecutor will truly interrupt underlying thread when cancellation is requested but the ForkJoinPool will not.
3. Utility class Promises
First things first, the class provides several method to conveniently create promises:
- It’s possible to convert to a ready value to successfully resolved
net.tascalate.concurrent.Promise
—Promises.success(T value)
:Promise<String> p = Promises.success("Tascalate");
- Similarly, the next method creates faulty resolved
net.tascalate.concurrent.Promise
—Promises.failure(Throwable exception)
:Promise<?> err = Promises.failure(new IllegalStateException());
- Naturally, there is a way to convert arbitrary CompletionStage to
net.tascalate.concurrent.Promise
—Promises.from(CompletionStage stage)
:CompletionStage<String> stage = ...; // Get CompletionStage Promise<String> = Promises.from(stage);
But most important, the class provides convenient methods to combine several CompletionStage-s:
Promise.all([boolean cancelRemaining], CompletionStage<? extends T>... promises) Promises.any([boolean cancelRemaining], CompletionStage<? extends T>... promises) Promises.anyStrict([boolean cancelRemaining], CompletionStage<? extends T>... promises) Promises.atLeast(int minResultsCount, [boolean cancelRemaining], CompletionStage<? extends T>... promises) Promises.atLeastStrict(int minResultsCount, [boolean cancelRemaining], CompletionStage<? extends T>... promises)
These methods may (and I would say “should”) be used instead of CompletableFuture.all
and CompletableFuture.any
methods and here is why:
- When method returns single result, its result type is
Promise
where type argumentT
is a most common supertype of parameters’ type arguments — unlikePromise
that is returned viaCompletableFuture.any
. - When method returns multiple results, its result type is
Promise
where type argumentT
is a most common supertype of parameters’ type arguments AND the result of the each successfully completedpromises
is available at the corresponding list index — unlikePromise
that is returned viaCompletableFuture.all
. - There are several overloads for
atLeast*
methods — the generalization ofany*
methods, useful when you have to collectN out of M, N <M
results. No such functionality is available in Java8 out of the box. - Once resulting
Promise
is resolved (either successfully or faulty) all remainingpromises
may be cancelled — either when explicitcancelRemaining
parameter istrue
or by default, when this parameter is omitted.
*ExplicitcancelRemaining
parameter is available currently only in current master branch, this functionality is planned for release 0.5.3 - There are separate non-strict vs strict overloads of methods. The difference is how to tolerate errors even if you don’t need all
promises
passed to be completed successfully to resolve resultingPromise
. Strict versions will resolve resultingPromise
faulty on the first error — you can use whatever suits best for your application logic. No such option is available in Java8 out of the box.
4. Extensions to ExecutorService API
It’s not mandatory to use any specific subclasses of java.util.concurrent.Executor
with the net.tascalate.concurrent.CompletableTask
– you may use any implementation that supports thread interruption. However, someone may find beneficial to have a Promise
-aware java.util.concurrent.ExecutorService
API. Below is a list of related classes/interfaces:
- Interface
net.tascalate.concurrent.TaskExecutorService
Specialization of the ExecutorService that usesnet.tascalate.Promise
as a result ofsubmit(...)
methods.:TaskExecutorService executor = ...; // Get concrete TaskExecutorService Promise<String> promise = executor .submit( () -> someLongRunningMethodWithStringResult() );
- Class
net.tascalate.concurrent.ThreadPoolTaskExecutor
A subclass of the standard ThreadPoolExecutor that implementsnet.tascalate.concurrent.TaskExecutorService
interface.ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor( 4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() ); Promise<Integer> promise = executor .submit( () -> someLongRunningMethodWithStringResult() ) .thenApply(String::length);
- Class
net.tascalate.concurrent.TaskExecutors
A drop-in replacement for the Executors utility class that returns various useful implementations ofnet.tascalate.concurrent.TaskExecutorService
instead of the standard ExecutorService.TaskExecutorService e1 = TaskExecutors.newFixedThreadPool(4); TaskExecutorService e2 = TaskExecutors.newCachedThreadPool(); TaskExecutorService e3 = TaskExecutors.newSingleThreadExecutor(); ... @Resource ManagedExecutorService managedExecutorService; // CDI injection ... TaskExecutorService tes = Executors.adapt(managedExecutorService);
Acknowledgements
Internal implementation details are greatly inspired by the work done by Lukáš Křečan. I want to express my great gratitude to Lukáš for his easy-to-follow, clean and bullet-proof code that served as blueprint for my implementation.