wissel.net

Usability - Productivity - Business - The web - Singapore & Twins

Async Java with vert.x


I wrote about more modern Java syntax and streams before.
There is more to it. Non Blocking I/O and Event Loops allow for
better performance. It's not a magic bullet, some readjustment is required

Adjusting methods, exceptions and return values

Initially it might look daunting, but the adjustments are not too big. Let's look at some examples. A classic Java method looks like this:

String someResult throws DidnWorkException {
    // Working code goes here
    if (someCondition) {
        throw new DidnWorkException();
    }
    return "It worked";
}

Its asynchronous counter-part looks like this:

Future<String> someResult() {
    return Future.future(promise -> {
        // Working code goes here
        if (someCondition) {
            promise.fail("It didn't work"); // Could use a Throwable too
        } else {
            promise.complete("It worked");
        }
    });
}

Update

Paulo Lopes, one of the Redhat vert.x core maintainers, suggested to restructure the method to make it clearer (and eliminate a lambda). Here is his take:

Future<String> someResult() {
  // we promise that we will write something in the future
  Promise<String> promise = Promise.promise();

  // Working code goes here (it can be an asynchronous call too...)
  if (someCondition) {
    promise.fail("It didn't work"); // Could use a Throwable too
  } else {
    promise.complete("It worked");
  }

  // we return the future (read-side) of the promise
  return promise.future();
}

His comment: It doesn't make much of a difference in terms of executed code, it's more of a readability pattern. You now read the code as if it was sequentially which makes it easier to reason (for some of us)

The main difference: an async function does not throw errors, but return successed or failures.
A failure will always return a Throwable,
even if you only provided a String as error explanation. When the body of your method might throw something, you wrap it in a try/catch as you are used to. Calling such a method is slightly different. In classic Java you code:

try {
    System.out.println(someResult());
} catch (Exception e) {
    logger.error(e);
}

while the async call looks like this:

someResult()
  .onFailure(logger::error)  //Java8 style shortcut for (e -> logger.error(e))
  .onSuccess(System.out::println);

There are more variations available what to do with a returned Future:

  • otherwise: return a value or call a function when the future failed
  • recover: call a future instead of the failed one and return its value
  • onComplete: run code when it is done, regardless of success or failure

There are a few more. Future functions can be chained using
compose or run in parallel using CompositeFuture

One common pattern with Futures: Only check if a future succeeded or failed and not requiring a return value to look at.
In this case we use a Future<Void> Void is a java.lang class.

More samples

Chaining multiple Futures (looks like JavaScript promises, isn't it?):

myFirstMethod()
   .compose(v -> mySecondMethod())
   .compose(secondResult -> thirdMethod(someVal, secondResult))
   .compose(this::numberFour)
   .onSuccess(System.out::println)
   .onFailure(logger::error);

The first call myFirstMethod() returns a Future<Void>. By convention we use v to indicate that it either returns Void or we don't need the actual result, but only the fact of success or failure. The :: syntax takes a little while to get used to.

When the sequence of a Future execution doesn't matter, e.g. calling a few independent services, but we only continue when they are all done, CompositeFuture is our friend. Lets say we have a series of services we need to call and assemble the result as a JsonObject:

Future<JsonObject> callAllServices(List<Supplier<Future<JsonObject>> services) {
    return Future.future(promise -> {
        List<Future<JsonObject>> list = services.stream()
                                                .forEach(Supplier::get)
                                                .collect(Collectors.toList());
        CompositeFuture.all(list)
           .onFailure(promise::fail)
           .onSuccess(v -> {
                final JsonObject result = new JsonObject();
                list.stream().forEach(r -> result.mergeIn(r.result()));
                promise.complete(result);
           });
    });
}

The method will call all Suppliers and return a result when all services return successful. When your business logic allows to live with available service only, the method looks slightly different (look carefully):

Future<JsonObject> callAvailableServices(List<Supplier<Future<JsonObject>> services) {
    return Future.future(promise -> {
        List<Future<JsonObject>> list = services.stream()
                                                .forEach(Supplier::get)
                                                .collect(Collectors.toList());
        CompositeFuture.any(list)
           .onFailure(promise::fail)
           .onSuccess(v -> {
                final JsonObject result = new JsonObject();
                list.stream().filter(Future::succeeded)
                    .forEach(r -> result.mergeIn(r.result()));
                promise.complete(result);
           });
    });
}

The difference is the call to CompositeFuture.any and filtering out results with .filter(Future::succeeded). On lesson learned. Using Streams and Futures and composition: actual work lives best in distinct functions that implement common interfaces

The dense syntax takes a while to get used to and there is more to learn, so YMMV


Posted by on 06 January 2022 | Comments (0) | categories: Domino Singapore

Comments

  1. No comments yet, be the first to comment