Combining newVirtualThreadPerTaskExecutor() and streams – Concurrency – Virtual Threads, Structured Concurrency

219. Combining newVirtualThreadPerTaskExecutor() and streams

Streams and newVirtualThreadPerTaskExecutor() is a handy combination. Here is an example that relies on IntStream to submit 10 simple tasks and collect the returned List of Future:

try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = IntStream.range(0, 10)
    .mapToObj(i -> executor.submit(() -> {
       return Thread.currentThread().toString()
         + “(” + i + “)”;
  })).collect(toList());
  // here we have the following snippet of code
}

Next, we wait for each Future to complete by calling the get() method:

  futures.forEach(f -> {
    try {
      logger.info(f.get());
    } catch (InterruptedException | ExecutionException ex) {
      // handle exception
    }
  });

Moreover, using stream pipelines is quite useful in combination with invokeAll(). For instance, the following stream pipeline returns a List of results (it filters all Future that hasn’t complete successfully):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .<String>mapMulti((f, c) -> {
    c.accept((String) f.resultNow());
  }).collect(Collectors.toList());

Alternatively, we can write the following solution (without mapMulti()):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(f -> f.resultNow().toString())
  .toList();

Of course, if List<Object> is all you need then you can go straightforward via Future::resultNow as follows:

List<Object> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(Future::resultNow)
  .toList();

On the other hand, you may need to collect all the Future that has been completed exceptionally. This can be achieved via exceptionNow() as follows (we intentionally sneaked in the given List<Callable> a Callable that will generate an StringIndexOutOfBoundsException, () -> “pass02”.substring(50)):

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”,
          () -> “pass02”.substring(50), () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .<Throwable>mapMulti((f, c) -> {
    c.accept((Throwable) f.exceptionNow());
  }).collect(Collectors.toList());

If you don’t prefer mapMulti() then rely on the classical approach:

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”.substring(50),
          () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .map(Future::exceptionNow)
  .toList();

You can find all these examples in the bundled code.