Valery Silaev's Blog

if it ain't broken we'll break it


Tascalate Concurrent library version 0.5.2 is released and available in the Maven Central Repository.

The library was created to overcome numerous shortcomings of the standard (and the only) implementation of the CompletionStage interface shipped with Java8 — CompletableFuture.
First and foremost, the library provides implementation of the CompletionStage that supports long-running blocking tasks (typically, I/O bound) – unlike Java 8 built-in implementation, CompletableFuture, that is primarily supports computational tasks.

Why a CompletableFuture is not enough?

There are several shortcomings associated with the CompletableFuture class implementation that complicate its usage for blocking tasks:

  1. CompletableFuture.cancel() method does not interrupt underlying thread; it merely puts future to exceptionally completed state. So if you use any blocking calls inside functions passed to thenApplyAsync / thenAcceptAsync / etc these functions will run till the end and never will be interrupted. Please see CompletableFuture can’t be interrupted by Tomasz Nurkiewicz.
  2. By default, all *Async composition methods use ForkJoinPool.commonPool() (see here) unless explicit Executor is specified. This thread pool shared between all CompletableFuture-s, all parallel streams and all applications deployed on the same JVM. This hard-coded, unconfigurable thread pool is completely outside of our control, hard to monitor and scale. Therefore you should always specify your own Executor.
  3. Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several CompletionStage-s. CompletableFuture.allOf / CompletableFuture.anyOf methods accept only CompletableFuture as arguments; you have no mechanism to combine arbitrary CompletionStage-s without converting them to CompletableFuture first. Also, the return type of the aforementioned CompletableFuture.allOf is declared as CompletableFuture – hence you are unable to extract conveniently individual results of the each future supplied. CompletableFuture.anyOf is even worse in this regard; for more details please read on here: CompletableFuture in Action (see Shortcomings) by Tomasz Nurkiewicz

How to use?

Add Maven dependency:

<dependency>
    <groupId>net.tascalate.concurrent</groupId>
    <artifactId>net.tascalate.concurrent.lib</artifactId>
    <version>0.5.2</version>
</dependency>

What is inside?

1. Promise interface

The interface net.tascalate.concurrent.Promise may be best described by the formula:

Promise == CompletionStage + Future

I.e., it combines both blocking Future’s API, including cancel(boolean mayInterruptIfRunning) method, AND composition capabilities of CompletionStage’s API. Importantly, all composition methods of CompletionStage API (thenAccept, thenCombine, whenComplete etc.) are re-declared to return Promise as well.

You may notice, that Java8 CompletableFuture implements both CompletionStage AND Future interfaces as well.

2. CompletableTask

This is why this project was ever started. net.tascalate.concurrent.CompletableTask is the implementation of the net.tascalate.concurrent.Promise API for long-running blocking tasks. There are several options to create a CompletableTask:

  • CompletableTask.runAsync(Runnable runnable, Executor executor)
    You may submit Runnable to the Executor:

    Promise<Void> p = CompletableTask.runAsync(
      this::someIoBoundMethod, myExecutor
    );
    

    You may notice similarities with CompletableFuture.runAsync(Runnable runnable, Executor executor) method.

  • CompletableTask.supplyAsync(Supplier<U> supplier, Executor executor)

    Alternatively, you may submit Supplier to the Executor:

    Promise<SomeValue> p = CompletableTask.supplyAsync(() -> {
      return blockingCalculationOfSomeValue();
    }, myExecutor);
    

    Again, you can notice direct analogy with CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) method

  • CompletableTask.asyncOn(Executor executor)
    This unit operation returns a resolved no-value Promise that is “bound” to the specified executor. I.e. any function passed to composition methods of Promise (like thenApplyAsync / thenAcceptAsync / whenCompleteAsync etc.) will be executed using this executor unless executor is overridden via explicit composition method parameter. Moreover, any recursively nested composition calls will use the same executor, if it’s not redefined via explicit composition method parameter:

    CompletableTask
      .asyncOn(myExecutor)
      .thenApplyAsync(myValueGenerator)
      .thenAcceptAsync(myConsumer)
      .thenRunAsync(myAction);
    

    All of myValueGenerator, myConsumer, myActtion will be executed using myExecutor.

  • CompletableTask.complete(T value, Executor executor)
    Same as above, but the starting point is a resolved Promise with the specified value:

    CompletableTask
       .complete("Hello!", myExecutor)
       .thenApplyAsync(myMapper)
       .thenApplyAsync(myTransformer)   
       .thenAcceptAsync(myConsumer)
       .thenRunAsync(myAction);
    

    All of myMapper, myTransformer, myConsumer, myActtion will be executed using myExecutor.

Most importantly, all composed promises support true cancellation (incl. interrupting thread) for the functions supplied as arguments:

Promise<?> p1 =
CompletableTask
  .asyncOn(myExecutor)
  .thenApplyAsync(myValueGenerator)
  .thenAcceptAsync(myConsumer);
  
Promise<?> p2 = p1.thenRunAsync(myAction);
...
p1.cancel(true);

In the example above myConsumer will be interrupted if already in progress. Both p1 and p2 will be resolved faulty: p1 with a CancellationException and p2 with a CompletionException.

It important to mention, that CompletableTask supports interrupting execution thread, but the actual behavior depends on the concrete Executor implementation: for example, the ThreadPoolExecutor will truly interrupt underlying thread when cancellation is requested but the ForkJoinPool will not.

3. Utility class Promises

First things first, the class provides several method to conveniently create promises:

  • It’s possible to convert to a ready value to successfully resolved net.tascalate.concurrent.PromisePromises.success(T value):
    Promise<String> p = Promises.success("Tascalate");
    
  • Similarly, the next method creates faulty resolved net.tascalate.concurrent.PromisePromises.failure(Throwable exception):
    Promise<?> err = Promises.failure(new IllegalStateException());
    
  • Naturally, there is a way to convert arbitrary CompletionStage to net.tascalate.concurrent.Promise
    Promises.from(CompletionStage stage):

    CompletionStage<String> stage = ...; // Get CompletionStage
    Promise<String> = Promises.from(stage);
    

But most important, the class provides convenient methods to combine several CompletionStage-s:

Promise.all([boolean cancelRemaining], CompletionStage<? extends T>... promises)
Promises.any([boolean cancelRemaining], CompletionStage<? extends T>... promises)
Promises.anyStrict([boolean cancelRemaining], CompletionStage<? extends T>... promises)
Promises.atLeast(int minResultsCount, [boolean cancelRemaining], CompletionStage<? extends T>... promises)
Promises.atLeastStrict(int minResultsCount, [boolean cancelRemaining], CompletionStage<? extends T>... promises)

These methods may (and I would say “should”) be used instead of CompletableFuture.all and CompletableFuture.any methods and here is why:

  1. When method returns single result, its result type is Promise where type argument T is a most common supertype of parameters’ type arguments — unlike Promise that is returned via CompletableFuture.any.
  2. When method returns multiple results, its result type is Promise where type argument T is a most common supertype of parameters’ type arguments AND the result of the each successfully completed promises is available at the corresponding list index — unlike Promise that is returned via CompletableFuture.all.
  3. There are several overloads for atLeast* methods — the generalization of any* methods, useful when you have to collect N out of M, N <M results. No such functionality is available in Java8 out of the box.
  4. Once resulting Promise is resolved (either successfully or faulty) all remaining promises may be cancelled — either when explicit cancelRemaining parameter is true or by default, when this parameter is omitted.
    *Explicit cancelRemaining parameter is available currently only in current master branch, this functionality is planned for release 0.5.3
  5. There are separate non-strict vs strict overloads of methods. The difference is how to tolerate errors even if you don’t need all promises passed to be completed successfully to resolve resulting Promise. Strict versions will resolve resulting Promise faulty on the first error — you can use whatever suits best for your application logic. No such option is available in Java8 out of the box.

4. Extensions to ExecutorService API

It’s not mandatory to use any specific subclasses of java.util.concurrent.Executor with the net.tascalate.concurrent.CompletableTask – you may use any implementation that supports thread interruption. However, someone may find beneficial to have a Promise-aware java.util.concurrent.ExecutorService API. Below is a list of related classes/interfaces:

  • Interface net.tascalate.concurrent.TaskExecutorService
    Specialization of the ExecutorService that uses net.tascalate.Promise as a result of submit(...) methods.:

    TaskExecutorService executor = ...; // Get concrete TaskExecutorService
    Promise<String> promise = executor
      .submit( () -> someLongRunningMethodWithStringResult() );
    
  • Class net.tascalate.concurrent.ThreadPoolTaskExecutor
    A subclass of the standard ThreadPoolExecutor that implements net.tascalate.concurrent.TaskExecutorService interface.

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(
      4, 4, 0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>()
    );
    Promise<Integer> promise = executor
      .submit( () -> someLongRunningMethodWithStringResult() )
      .thenApply(String::length);
    
  • Class net.tascalate.concurrent.TaskExecutors
    A drop-in replacement for the Executors utility class that returns various useful implementations of net.tascalate.concurrent.TaskExecutorService instead of the standard ExecutorService.

    TaskExecutorService e1 = TaskExecutors.newFixedThreadPool(4);
    TaskExecutorService e2 = TaskExecutors.newCachedThreadPool();
    TaskExecutorService e3 = TaskExecutors.newSingleThreadExecutor();
    ...
    @Resource
    ManagedExecutorService managedExecutorService; // CDI injection
    ...
    TaskExecutorService tes = Executors.adapt(managedExecutorService);
    

Acknowledgements

Internal implementation details are greatly inspired by the work done by Lukáš Křečan. I want to express my great gratitude to Lukáš for his easy-to-follow, clean and bullet-proof code that served as blueprint for my implementation.

Leave a comment