Valery Silaev's Blog

if it ain't broken we'll break it

Tascalate Concurrent library version 0.5.3 is released and available in the Maven Central Repository.

As promised, this release adds explicit cancelRemaining parameter to the overloaded Promises combinator methods like all / any / atLeast and corresponding *Strict variants. This parameter specifies is it necessary to cancel remaining pending promises once the result is known to be resolved. When omitted, the default value is cancelRemaining = true.

Besides some important bug-fixes, these release introduces a new class: DependentPromise. Let’s review why you need it in you day-to-day asynchronous code development.

You should know that once you cancel a Promise, all Promise-s that depends on this one are completed with CompletionException wrapping CancellationException. This is a standard behavior, and CompletableFuture works just like this.

However, when you cancel a derived Promise, the original Promise is not cancelled:

Promise<?> original = CompletableTask
  .supplyAsync(() -> someIoBoundMethod(), myExecutor);
Promise<?> derived = original
  .thenRunAsync(() -> someMethod() );
...
derived.cancel(true);

So if you cancel derived above it’s Runnable method, wrapping someMethod, is interrupted. However the original promise is not cancelled and someIoBoundMethod keeps running. This is not always a desired behavior, consider the following method:

public Promise<DataStructure> loadData(String url) {
   return CompletableTask
          .supplyAsync( () -> loadXml(url) )
          .thenApplyAsync( xml -> parseXml(xml) ); 
}

...
Promise<DataStructure> p = loadData("http://someserver.com/rest/ds");
...
if (someCondition()) {
  // Only second promise is canceled, parseXml.
  p.cancel(true);
}

Clients of this method see only derived promise, and once they decide to cancel it, it is expected that any of loadXml and parseXml will be interrupted if not completed yet. To address this issue the library provides DependentPromise class:

public Promise<DataStructure> loadData(String url) {
   return DependentPromise
          .from(CompletableTask.supplyAsync( () -> loadXml(url) ))
          .thenApplyAsync( xml -> parseXml(xml), true ); 
}

...
Promise<DataStructure> p = loadData("http://someserver.com/rest/ds");
...
if (someCondition()) {
  // Now the whole chain is canceled.
  p.cancel(true);
}

DependentPromise overloads methods like thenApply / thenRun / thenAccept / thenCombine etc with additional argument:

  • if method accepts no other CompletionStage, like thenApply / thenRun / thenAccept etc, then it’s a boolean flag enlistOrigin to specify whether or not the original Promise should be enlisted for cancellation.
  • if method accepts other CompletionStage, like thenCombine / applyToEither / thenAcceptBoth etc, then it’s a set of PromiseOrigin enum values, that specifies whether or not the original Promise and/or CompletionStage supplied as argument should be enlisted for cancellation along with the resulting promise

For example:

public Promise<DataStructure> loadData(String url) {
   return DependentPromise
          .from(CompletableTask.supplyAsync( () -> loadXml(url + "/source1") ))
          .thenCombine( 
              CompletableTask.supplyAsync( () -> loadXml(url + "/source2") ), 
              (xml1, xml2) -> Arrays.asList(xml1, xml2),
              PromiseOrigin.ALL
          )          .
          .thenApplyAsync( xmls -> parseXmlsList(xmls), true ); 
}

Please note, then in the planned release 0.5.4 there will be a new default method dependent in interface Promise that serves the same purpose and allows to write chained calls:

public Promise<DataStructure> loadData(String url) {
   return CompletableTask
          .supplyAsync( () -> loadXml(url) )
          .dependent()
          .thenApplyAsync( xml -> parseXml(xml), true ); 
}

Leave a comment