Introducing the ExecutorService invoke all/any for virtual threads – part 2 – Concurrency – Virtual Threads, Structured Concurrency

217. Introducing the ExecutorService invoke all/any for virtual threads – part 2

Earlier, in Problem 203, we wrote a piece of “unstructured” concurrency code for building a testing team of three testers served by an external server.Now, let’s try to re-write the buildTestingTeam() method via invokeAll()/Any() and newVirtualThreadPerTaskExecutor(). If we rely on invokeAll() then the application will attempt to load three testers by id as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException {
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    futures.forEach(f -> logger.info(() -> “State: “
      + f.state()));
    return new TestingTeam(futures.get(0).resultNow(),
      futures.get(1).resultNow(), futures.get(2).resultNow());                      
  }      
}

We have three testers with ids 1, 2, and 3. So, the output will be:

[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS

In the next problem, we will see how we can take decisions based on task state.If we can handle the testing phase even with a single tester then we can rely on invokeAny() as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException, ExecutionException {
      
  try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
    String result = executor.invokeAny(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    logger.info(result);
    return new TestingTeam(result);
  }      
}

This code will return a single result representing one of these three testers. If none of them is available then we will get an UserNotFoundException exception.

218. Hooking task state

Starting with JDK 19 we can rely on Future#state(). This method computes the state of a Future based on the well-known get(), isDone(), and isCancelled() and returns a Future.State enum entry as follows:

CANCELLED – The task was canceled.

FAILED – The task was completed exceptionally (with an exception).

RUNNING – The task is still running (has not been completed).

SUCCESS – The task was completed normally with a result (no exception).

In the following snippet of code, we analyze the state of loading the testing team members and act accordingly:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
  List<String> testers = new ArrayList<>();
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(Integer.MAX_VALUE),
              () -> fetchTester(2),
              () -> fetchTester(Integer.MAX_VALUE)));
    futures.forEach(f -> {
      logger.info(() -> “Analyzing ” + f + ” state …”);            
              
      switch (f.state()) {
        case RUNNING -> throw new IllegalStateException(
          “Future is still in the running state …”);
        case SUCCESS -> {
          logger.info(() -> “Result: ” + f.resultNow());
          testers.add(f.resultNow());
        }
        case FAILED ->
          logger.severe(() -> “Exception: “
            + f.exceptionNow().getMessage());
        case CANCELLED ->
          logger.info(“Cancelled ?!?”);
      }
    });                      
  }
      
  return new TestingTeam(testers.toArray(String[]::new));
}

We know that when the execution reaches the switch block the Future objects should be completely normal or exceptional. So, if the current Future state is RUNNING then this is a really weird situation (possibly a bug) and we throw an IllegalStateException. Next, if the Future state is SUCCESS (fetchTester(2)) then we have a result that can be obtained via resultNow(). This method was added in JDK 19 and it is useful when we know that we have a result. The resultNow() method returns immediately without waiting (as get()). If the state is FAILED (fetchTester(Integer.MAX_VALUE)) then we log the exception via exceptionNow(). This method was also added in JDK 19 and it returns immediately the underlying exception of a failed Future. Finally, if the Future was canceled then there is nothing to do. We just report it in the log.

Introducing the ExecutorService invoke all/any for virtual threads – part 1 – Concurrency – Virtual Threads, Structured Concurrency

216. Introducing the ExecutorService invoke all/any for virtual threads – part 1

We have introduced the ExecutorService’s invokeAll()/invokeAny() in Java Coding Problems, First Edition, Chapter 10, Problem 207.

Working with invokeAll()

In a nutshell, invokeAll() executes a collection of tasks (Callable) and returns a List<Future> that holds the results/status of each task. The tasks can finish naturally or forced by a given timeout. Each task can finish successfully or exceptionally. Upon return, all the tasks that have not been completed yet are automatically canceled. We can check out the status of each task via Future#isDone() and Future#isCancelled().

<T> List<Future<T>> invokeAll(Collection<? extends
  Callable<T>> tasks) throws InterruptedException
<T> List<Future<T>> invokeAll(
  Collection<? extends Callable<T>> tasks, long timeout,
    TimeUnit unit) throws InterruptedException

Using invokeAll() with virtual threads via newVirtualThreadPerTaskExecutor() is straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = executor.invokeAll(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  futures.forEach(f -> logger.info(() ->
    “State: ” + f.state()));
}

Have you spotted the f.state() call? This API was introduced in JDK 19 and it computes the state of a future based on the well-known get(), isDone(), and isCancelled(). While we will detail this in a subsequent problem, at this moment the output will be as follows:

[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS

The three tasks have successfully completed.

Working with invokeAny()

In a nutshell, invokeAny() executes a collection of tasks (Callable) and strives to return a result corresponding to a task that has successfully terminated (before the given timeout, if any). All the tasks that have not been completed are automatically canceled.

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit) throws InterruptedException,
    ExecutionException, TimeoutException

Using invokeAny() with virtual threads via newVirtualThreadPerTaskExecutor() is also straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances while we are interested in a single result:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  String result = executor.invokeAny(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  logger.info(result);
}

A possible output can be:

[10:29:33] pass02

This output corresponds to the second Callable.In the next problem, we will come up with a more realistic example.