Introducing the ExecutorService invoke all/any for virtual threads – part 2 – Concurrency – Virtual Threads, Structured Concurrency

217. Introducing the ExecutorService invoke all/any for virtual threads – part 2

Earlier, in Problem 203, we wrote a piece of “unstructured” concurrency code for building a testing team of three testers served by an external server.Now, let’s try to re-write the buildTestingTeam() method via invokeAll()/Any() and newVirtualThreadPerTaskExecutor(). If we rely on invokeAll() then the application will attempt to load three testers by id as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException {
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    futures.forEach(f -> logger.info(() -> “State: “
      + f.state()));
    return new TestingTeam(futures.get(0).resultNow(),
      futures.get(1).resultNow(), futures.get(2).resultNow());                      
  }      
}

We have three testers with ids 1, 2, and 3. So, the output will be:

[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS

In the next problem, we will see how we can take decisions based on task state.If we can handle the testing phase even with a single tester then we can rely on invokeAny() as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException, ExecutionException {
      
  try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
    String result = executor.invokeAny(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    logger.info(result);
    return new TestingTeam(result);
  }      
}

This code will return a single result representing one of these three testers. If none of them is available then we will get an UserNotFoundException exception.

218. Hooking task state

Starting with JDK 19 we can rely on Future#state(). This method computes the state of a Future based on the well-known get(), isDone(), and isCancelled() and returns a Future.State enum entry as follows:

CANCELLED – The task was canceled.

FAILED – The task was completed exceptionally (with an exception).

RUNNING – The task is still running (has not been completed).

SUCCESS – The task was completed normally with a result (no exception).

In the following snippet of code, we analyze the state of loading the testing team members and act accordingly:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
  List<String> testers = new ArrayList<>();
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(Integer.MAX_VALUE),
              () -> fetchTester(2),
              () -> fetchTester(Integer.MAX_VALUE)));
    futures.forEach(f -> {
      logger.info(() -> “Analyzing ” + f + ” state …”);            
              
      switch (f.state()) {
        case RUNNING -> throw new IllegalStateException(
          “Future is still in the running state …”);
        case SUCCESS -> {
          logger.info(() -> “Result: ” + f.resultNow());
          testers.add(f.resultNow());
        }
        case FAILED ->
          logger.severe(() -> “Exception: “
            + f.exceptionNow().getMessage());
        case CANCELLED ->
          logger.info(“Cancelled ?!?”);
      }
    });                      
  }
      
  return new TestingTeam(testers.toArray(String[]::new));
}

We know that when the execution reaches the switch block the Future objects should be completely normal or exceptional. So, if the current Future state is RUNNING then this is a really weird situation (possibly a bug) and we throw an IllegalStateException. Next, if the Future state is SUCCESS (fetchTester(2)) then we have a result that can be obtained via resultNow(). This method was added in JDK 19 and it is useful when we know that we have a result. The resultNow() method returns immediately without waiting (as get()). If the state is FAILED (fetchTester(Integer.MAX_VALUE)) then we log the exception via exceptionNow(). This method was also added in JDK 19 and it returns immediately the underlying exception of a failed Future. Finally, if the Future was canceled then there is nothing to do. We just report it in the log.