Introducing the ExecutorService invoke all/any for virtual threads – part 2 – Concurrency – Virtual Threads, Structured Concurrency

217. Introducing the ExecutorService invoke all/any for virtual threads – part 2

Earlier, in Problem 203, we wrote a piece of “unstructured” concurrency code for building a testing team of three testers served by an external server.Now, let’s try to re-write the buildTestingTeam() method via invokeAll()/Any() and newVirtualThreadPerTaskExecutor(). If we rely on invokeAll() then the application will attempt to load three testers by id as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException {
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    futures.forEach(f -> logger.info(() -> “State: “
      + f.state()));
    return new TestingTeam(futures.get(0).resultNow(),
      futures.get(1).resultNow(), futures.get(2).resultNow());                      
  }      
}

We have three testers with ids 1, 2, and 3. So, the output will be:

[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS

In the next problem, we will see how we can take decisions based on task state.If we can handle the testing phase even with a single tester then we can rely on invokeAny() as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException, ExecutionException {
      
  try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
    String result = executor.invokeAny(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    logger.info(result);
    return new TestingTeam(result);
  }      
}

This code will return a single result representing one of these three testers. If none of them is available then we will get an UserNotFoundException exception.

218. Hooking task state

Starting with JDK 19 we can rely on Future#state(). This method computes the state of a Future based on the well-known get(), isDone(), and isCancelled() and returns a Future.State enum entry as follows:

CANCELLED – The task was canceled.

FAILED – The task was completed exceptionally (with an exception).

RUNNING – The task is still running (has not been completed).

SUCCESS – The task was completed normally with a result (no exception).

In the following snippet of code, we analyze the state of loading the testing team members and act accordingly:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
  List<String> testers = new ArrayList<>();
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(Integer.MAX_VALUE),
              () -> fetchTester(2),
              () -> fetchTester(Integer.MAX_VALUE)));
    futures.forEach(f -> {
      logger.info(() -> “Analyzing ” + f + ” state …”);            
              
      switch (f.state()) {
        case RUNNING -> throw new IllegalStateException(
          “Future is still in the running state …”);
        case SUCCESS -> {
          logger.info(() -> “Result: ” + f.resultNow());
          testers.add(f.resultNow());
        }
        case FAILED ->
          logger.severe(() -> “Exception: “
            + f.exceptionNow().getMessage());
        case CANCELLED ->
          logger.info(“Cancelled ?!?”);
      }
    });                      
  }
      
  return new TestingTeam(testers.toArray(String[]::new));
}

We know that when the execution reaches the switch block the Future objects should be completely normal or exceptional. So, if the current Future state is RUNNING then this is a really weird situation (possibly a bug) and we throw an IllegalStateException. Next, if the Future state is SUCCESS (fetchTester(2)) then we have a result that can be obtained via resultNow(). This method was added in JDK 19 and it is useful when we know that we have a result. The resultNow() method returns immediately without waiting (as get()). If the state is FAILED (fetchTester(Integer.MAX_VALUE)) then we log the exception via exceptionNow(). This method was also added in JDK 19 and it returns immediately the underlying exception of a failed Future. Finally, if the Future was canceled then there is nothing to do. We just report it in the log.

Introducing the ExecutorService invoke all/any for virtual threads – part 1 – Concurrency – Virtual Threads, Structured Concurrency

216. Introducing the ExecutorService invoke all/any for virtual threads – part 1

We have introduced the ExecutorService’s invokeAll()/invokeAny() in Java Coding Problems, First Edition, Chapter 10, Problem 207.

Working with invokeAll()

In a nutshell, invokeAll() executes a collection of tasks (Callable) and returns a List<Future> that holds the results/status of each task. The tasks can finish naturally or forced by a given timeout. Each task can finish successfully or exceptionally. Upon return, all the tasks that have not been completed yet are automatically canceled. We can check out the status of each task via Future#isDone() and Future#isCancelled().

<T> List<Future<T>> invokeAll(Collection<? extends
  Callable<T>> tasks) throws InterruptedException
<T> List<Future<T>> invokeAll(
  Collection<? extends Callable<T>> tasks, long timeout,
    TimeUnit unit) throws InterruptedException

Using invokeAll() with virtual threads via newVirtualThreadPerTaskExecutor() is straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = executor.invokeAll(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  futures.forEach(f -> logger.info(() ->
    “State: ” + f.state()));
}

Have you spotted the f.state() call? This API was introduced in JDK 19 and it computes the state of a future based on the well-known get(), isDone(), and isCancelled(). While we will detail this in a subsequent problem, at this moment the output will be as follows:

[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS

The three tasks have successfully completed.

Working with invokeAny()

In a nutshell, invokeAny() executes a collection of tasks (Callable) and strives to return a result corresponding to a task that has successfully terminated (before the given timeout, if any). All the tasks that have not been completed are automatically canceled.

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit) throws InterruptedException,
    ExecutionException, TimeoutException

Using invokeAny() with virtual threads via newVirtualThreadPerTaskExecutor() is also straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances while we are interested in a single result:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  String result = executor.invokeAny(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  logger.info(result);
}

A possible output can be:

[10:29:33] pass02

This output corresponds to the second Callable.In the next problem, we will come up with a more realistic example.

Combining newVirtualThreadPerTaskExecutor() and streams – Concurrency – Virtual Threads, Structured Concurrency

219. Combining newVirtualThreadPerTaskExecutor() and streams

Streams and newVirtualThreadPerTaskExecutor() is a handy combination. Here is an example that relies on IntStream to submit 10 simple tasks and collect the returned List of Future:

try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = IntStream.range(0, 10)
    .mapToObj(i -> executor.submit(() -> {
       return Thread.currentThread().toString()
         + “(” + i + “)”;
  })).collect(toList());
  // here we have the following snippet of code
}

Next, we wait for each Future to complete by calling the get() method:

  futures.forEach(f -> {
    try {
      logger.info(f.get());
    } catch (InterruptedException | ExecutionException ex) {
      // handle exception
    }
  });

Moreover, using stream pipelines is quite useful in combination with invokeAll(). For instance, the following stream pipeline returns a List of results (it filters all Future that hasn’t complete successfully):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .<String>mapMulti((f, c) -> {
    c.accept((String) f.resultNow());
  }).collect(Collectors.toList());

Alternatively, we can write the following solution (without mapMulti()):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(f -> f.resultNow().toString())
  .toList();

Of course, if List<Object> is all you need then you can go straightforward via Future::resultNow as follows:

List<Object> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(Future::resultNow)
  .toList();

On the other hand, you may need to collect all the Future that has been completed exceptionally. This can be achieved via exceptionNow() as follows (we intentionally sneaked in the given List<Callable> a Callable that will generate an StringIndexOutOfBoundsException, () -> “pass02”.substring(50)):

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”,
          () -> “pass02”.substring(50), () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .<Throwable>mapMulti((f, c) -> {
    c.accept((Throwable) f.exceptionNow());
  }).collect(Collectors.toList());

If you don’t prefer mapMulti() then rely on the classical approach:

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”.substring(50),
          () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .map(Future::exceptionNow)
  .toList();

You can find all these examples in the bundled code.

Introducing scope object (StructuredTaskScope) 2 – Concurrency – Virtual Threads, Structured Concurrency

First, we create a StructuredTaskScope in a try-with-resources pattern. StructuredTaskScope implements AutoCloseable:

try (StructuredTaskScope scope
      = new StructuredTaskScope<String>()) {
    …
}

The scope is a wrapper for the virtual threads’ lifetime. We use the scope to fork as many virtual threads (subtasks) as needed via the fork(Callable task) method. Here, we fork only one virtual thread and get back a Subtask (forking is a non-blocking operation):

Subtask<String> subtask = scope.fork(() -> fetchTester(1));

Next, we have to call the join() method (or joinUntil(Instant deadline)). This method waits for all threads (all Subtask instances) forked from this scope (or, all threads that have been submitted to this scope) to complete, so it is a blocking operation. A scope should block only for waiting its subtasks to complete, and this is happening via join() or joinUntil().

scope.join();          

When the execution passes this line, we know that all threads (all forked Subtask) forked from this scope are complete with a result or an exception (each subtask run indenpendently, therefore each of them can complete with a result or an exception). Here, we call the non-blocking get() mehod to get the result but pay attention that calling get() for a task that did not complete will rise an exception as IllegalStateException(“Owner did not join after forking subtask”):

String result = subtask.get();

On the other hand, we can obtain the exception of a failed task via exception(). However, if we call exception() for a subtask (Subtask) that is completed with a result then we will get back an exception as IllegalStateException(“Subtask not completed or did not complete with exception”). So, if you are not sure that your task(s) is always complete with a result or an exception, it is better to call get() or exception() only after you test the state of the corresponding Subtask. A state of SUCCESS will safely allow you to call get(), while a state of FAILED will safely allow you to call exception(). So, in our case, we may prefer it this way:

String result = “”;
if (subtask.state().equals(Subtask.State.SUCCESS)) {
  result = subtask.get();
}

Beside Subtask.State.SUCCESS and Subtask.State.FAILED, we also have Subtask.State.UNAVAILABLE which means that the subtask is not available (for instance, if the subtask is still running then its state is UNAVAILABLE, but it could be other cause as well).That’s all!

ExecutorService vs. StructuredTaskScope

The previous code looks like the code that we write via a classical ExecutorService but, there are two big differences between these solutions. First of all, an ExecutorService holds the precious platform threads and allows us to pool them. On the other hand, a StructuredTaskScope is just a thin launcher for virtual threads that are cheap and shouldn’t be pooled. So, once we’ve done our job, a StructuredTaskScope can be destroyed and garbage collected. Second, an ExecutorService holds a single queue for all the tasks and the threads take from this queue whenever they have the chance to do it. A StructuredTaskScope relies on a fork/join pool and each virtual thread has it own wait queue. However, a virtual thread can steal a task from another queue as well. This is known as the work-stealing pattern and it was covered in Java Coding Problem, First Edition, Chapter 11.

Introducing scope object (StructuredTaskScope) – Concurrency – Virtual Threads, Structured Concurrency

220. Introducing scope object (StructuredTaskScope)

So far, we have covered a bunch of problems that use virtual threads directly or indirectly via an ExecutorService. We already know that virtual threads are cheap to create and block and that an application can run millions of them. We don’t need to reuse them, pool them or do any fancy stuff. Use and throw is the proper and recommended way to deal with virtual threads. This means that virtual threads are very useful for expressing and writing asynchronous code which is commonly based on a lot of threads that are capable of blocking/unblocking many times in a short period of time. On the other hand, we know that OS threads are expensive to create, very expensive to block, and are not easy to put in an asynchronous context.Before virtual threads (so, for many many years), we had to manage the lifecycle of OS threads via an ExecutorService/Executor and we could write asynchronous (or reactive) code via callbacks (you can find detailed coverage of asynchronous programming in Java Coding Problems, First Edition, Chapter 11).However, asynchronous/reactive code is hard to write/read, very hard to debug and profile, and almost deadly hard to unit test. Nobody wants to read and fix your asynchronous code! Moreover, once we start to write an application via asynchronous callback we tend to use this model for all tasks, even for those that shouldn’t be asynchronous. We can easily fall into this trap when we need to link somehow asynchronous code/results to non-asynchronous code. And, the easiest way to do it is to go only for asynchronous code.So, is there a better way? Yes, it is! Structured Concurrency should be the right answer. Structured Concurrency has started as an incubator project and reached the preview stage in JDK 21 (JEP 453).And, in this context, we should introduce StructuredTaskScope. A StructuredTaskScope is a virtual thread launcher for Callable tasks that returns a Subtask. A subtask is an extension of the well-known Supplier<T> fuctional interface represented by the StructuredTaskScope.Subtask<T> interface and forked with StructuredTaskScope.fork(Callable task). It follows and works based on the fundamental principle of structured concurrency (see Problem 203) – When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block. These threads are responsible to run subtasks (Subtask) of the given task as a single unit of work.Let’s have an example of fetching a single tester (with id 1) from our web server via StructuredTaskScope:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
     
  try (StructuredTaskScope scope
      = new StructuredTaskScope<String>()) {
    Subtask<String> subtask
      = scope.fork(() -> fetchTester(1));
    logger.info(() -> “Waiting for ” + subtask.toString()
      + ” to finish …\n”);
    scope.join();          
    String result = subtask.get();
    logger.info(result);
          
    return new TestingTeam(result);
  }      
}

Introducing ShutdownOnSuccess – Concurrency – Virtual Threads, Structured Concurrency

221. Introducing ShutdownOnSuccess

In the previous problem, we introduced StructuredTaskScope and use it to solve a task via a single virtual thread (a single Subtask). Basically, we fetched the tester with id 1 from our server (we had to wait until this one was available). Next, let’s assume that we still need a single tester, but not mandatory the one with id 1. This time, it could be any of ids 1, 2, or 3. We simply take the first one that is available from these three, and we cancel the other two requests.Especially for such scenarios, we have an extension of StructuredTaskScope called StructuredTaskScope.ShutdownOnSuccess. This scope is capable to return the result of the first task that completes successfully and interrupts the rest of the threads. It follows the “invoke any” model and it can be used as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnSuccess scope
      = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
          
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(3));
                                               
    scope.join();
          
    logger.info(() -> “Subtask-1 state: ” + future1.state());
    logger.info(() -> “Subtask-2 state: ” + future2.state());
    logger.info(() -> “Subtask-3 state: ” + future3.state());
    String result = (String) scope.result();
    logger.info(result);
          
    return new TestingTeam(result);
  }
}

Here, we fork three subtasks (threads) that will compete with each other to complete. The first subtask (thread) that completes successfully wins and returns. The result() method returns this result (if none of the subtasks (threads) complete successfully then it throws ExecutionException).If we check the state of these three Subtask we can see that one succeeds while the other two are unavailable:

[09:01:50] Subtask-1 state: UNAVAILABLE
[09:01:50] Subtask-2 state: SUCCESS
[09:01:50] Subtask-3 state: UNAVAILABLE

Of course, you don’t need the code that checks/print the state of each Subtask. It was added here just to highlight how ShutdownOnSuccess works. You don’t even need the explicit Subtask objects since we don’t call get() or something else from this API. Basically, we can resume the code to the following:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
  try (ShutdownOnSuccess scope
      = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    scope.fork(() -> fetchTester(1));
    scope.fork(() -> fetchTester(2));
    scope.fork(() -> fetchTester(3));
    scope.join();
     
    return new TestingTeam((String) scope.result());
  }
}

Done! You just create the scope, fork your subtasks, call join() , and collect the result. So, the scope is really business-focused.A task that completes exceptionally under the ShutdownOnSuccess umbrella will never be chosen to produce a result. However, if all tasks complete exceptionally then we will get an ExecutionException that wraps the exception (the cause) of the first completed task.

Introducing ShutdownOnFailure – Concurrency – Virtual Threads, Structured Concurrency

222. Introducing ShutdownOnFailure

As its name suggests, StructuredTaskScope.ShutdownOnFailure is capable to return the exception of the first subtask that completes exceptionally and interrupts the rest of the subtasks (threads). For instance, we may want to fetch the testers with ids 1, 2, and 3. Since we need exactly these three testers, we want to be informed if any of them is not available and cancel everything. The code looks as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnFailure scope
      = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(Integer.MAX_VALUE));
    scope.join();
    logger.info(() -> “Subtask-1 state: ” + subtask1.state());
    logger.info(() -> “Subtask-2 state: ” + subtask2.state());
    logger.info(() -> “Subtask-3 state: ” + subtask3.state());
    Optional<Throwable> exception = scope.exception();
    if (exception.isEmpty()) {
      logger.info(() -> “Subtask-1 result:” + subtask1.get());
      logger.info(() -> “Subtask-2 result:” + subtask2.get());
      logger.info(() -> “Subtask-3 result:” + subtask3.get());
              
      return new TestingTeam(
        subtask1.get(), subtask2.get(), subtask3.get());
    } else {
      logger.info(() -> exception.get().getMessage());
      scope.throwIfFailed();
    }
  }
       
  return new TestingTeam();
}

In this example, we intentionally replaced id 3 with Integer.MAX_VALUE. Since there is no tester with this id the server will throw UserNotFoundException. This means that the states of the subtasks will reveal that the third subtask has failed:

[16:41:15] Subtask-1 state: SUCCESS
[16:41:15] Subtask-2 state: SUCCESS
[16:41:15] Subtask-3 state: FAILED

Moreover, when we call the exception() method we will get back an Optional<Throwable> containing this exception (a deep coverage of Optional feature is available in Java Coding Problems, First Edition, Chapter 12). If we decide to throw it then we simply call the throwIfFailed() method which wraps the original exception (the cause) in an ExecutionException and throws it. The message of the exception in our case will be:

Exception in thread “main”
 java.util.concurrent.ExecutionException:
  modern.challenge.UserNotFoundException: Code: 404

If we remove the guideline code then we can compact the previous code as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnFailure scope
      = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(
      Integer.MAX_VALUE)); // this causes exception
    scope.join();
    scope.throwIfFailed();
    // because we have an exception the following
    // code will not be executed
    return new TestingTeam(
      subtask1.get(), subtask2.get(), subtask3.get());          
  }
}

If no exception occurs then throwIfFailed() doesn’t do anything and those three testers are available. The result of each Subtask is available via the non-blocking Subtask.get().A subtask that completes exceptionally under the ShutdownOnFailure umbrella will be chosen to produce an exception. However, if all subtasks complete normally then we will not get any exceptions. On the other hand, if no subtasks were completed exceptionally but were canceled then ShutdownOnFailure will throw CancellationException.

Combining StructuredTaskScope and streams 3 – Concurrency – Virtual Threads, Structured Concurrency

Running 10000 tasks via fixed thread pool executor

Depending on your machine, the previous test may finish successfully or it may result in an OutOfMemoryError. We can avoid this unpleasant scenario by using a fixed thread pool. For instance, let’s limit the number of platform threads to 200 via the following snippet of code:

long start = System.currentTimeMillis();
       
try (ExecutorService executorFixed
    = Executors.newFixedThreadPool(200)) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorFixed.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 50190 ms (50 seconds) to run these 10000 tasks using at peak 216 platform threads. The following screenshot from JMX reveals this information:

Figure 10.15 – Running 10000 tasks via fixed thread pool executor

Obviously, a smaller number of platform threads is reflected in performance. If we put 216 workers to do the job of 7729 workers it will take longer for sure. Next, let’s see how virtual threads will handle this challenge.

Running 10000 tasks via virtual thread per task executor

This time, let’s see how the newVirtualThreadPerTaskExecutor() can handle these 10000 tasks. The code is straightforward:

long start = System.currentTimeMillis();
      
try (ExecutorService executorVirtual
      = Executors.newVirtualThreadPerTaskExecutor()) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorVirtual.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
      
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 3519 ms (3.5 seconds) to run these 10000 tasks using at peak 25 platform threads. The following screenshot from JMX reveals this information:

Figure 10.16 – Running 10000 tasks via virtual thread per task executor

Wooow! How cool is this?! The resulting time is far away the best in comparison with the previous tests and it uses fewer resources (only 25 platform threads). So, virtual threads really rock!I also strongly recommend you check out the following benchmark: https://github.com/colincachia/loom-benchmark/tree/main.Starting with JDK 21, the JMX’s HotSpotDiagnosticMXBean was enriched with the dumpThreads(String outputFile, ThreadDumpFormat format) method. This method outputs a thread dump to the given file (outputFile) in the given format (format). The thread dump will contain all platform threads but it may contain some or all virtual threads.In the following code, we attempt to obtain a thread dump for all subtasks (threads) of a StructuredTaskScope:

try (ShutdownOnSuccess scope
  = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
  Stream.of(1, 2, 3)
        .<Callable<String>>map(id -> () -> fetchTester(id))
        .forEach(scope::fork);
  HotSpotDiagnosticMXBean mBean = ManagementFactory
    .getPlatformMXBean(HotSpotDiagnosticMXBean.class);
  mBean.dumpThreads(Path.of(“dumpThreads.json”)
    .toAbsolutePath().toString(),                    
  HotSpotDiagnosticMXBean.ThreadDumpFormat.JSON);
  scope.join();
  String result = (String) scope.result();
  logger.info(result);
}

The output file is named threadDump.json and you can find it in the root folder of the application. The part of the output that we are interested in is partially listed here:


{
  “container”: “java.util.concurrent
             .StructuredTaskScope$ShutdownOnSuccess@6d311334″,
  “parent”: “<root>”,
  “owner”: “1”,
  “threads”: [
    {
    “tid”: “22”
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
       .run(VirtualThread.java:311)”
      ]
    },
    {
    “tid”: “24”,
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
        .run(VirtualThread.java:311)”
      ]
    },
    {
    “tid”: “25”,
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
        .run(VirtualThread.java:311)”
      ]
    }
  ],
  “threadCount”: “3”
}

As you can see, we have three virtual threads (#22, #24, and #25) that run subtasks of our scope. In the bundled code, you can find the complete output.

Summary

This chapter covered 16 introductory problems about virtual threads and structured concurrency. You can see this chapter as the preparation for the next one, which will cover more detailed aspects of these two topics.

Combining StructuredTaskScope and streams – Concurrency – Virtual Threads, Structured Concurrency

223. Combining StructuredTaskScope and streams

If you prefer functional programming then you’ll be happy to see that streams can be used with StructuredTaskScope as well. For instance, here we re-write the application from Problem 214 using a stream pipeline for forking our tasks:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnSuccess scope
      = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    Stream.of(1, 2, 3)
      .<Callable<String>>map(id -> () -> fetchTester(id))
      .forEach(scope::fork);
    scope.join();
    String result = (String) scope.result();
    logger.info(result);
         
    return new TestingTeam(result);
  }
}

Moreover, we can use stream pipelines to collect results and exceptions as follows:

public static TestingTeam buildTestingTeam()
    throws InterruptedException, ExecutionException {
 try (ShutdownOnSuccess scope
  = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
  List<Subtask> subtasks = Stream.of(Integer.MAX_VALUE, 2, 3)
   .<Callable<String>>map(id -> () -> fetchTester(id))
   .map(scope::fork)
   .toList();
  scope.join();
  List<Throwable> failed = subtasks.stream()
   .filter(f -> f.state() == Subtask.State.FAILED)
   .map(Subtask::exception)
   .toList();
  logger.info(failed.toString());
  TestingTeam result = subtasks.stream()
   .filter(f -> f.state() == Subtask.State.SUCCESS)
   .map(Subtask::get)                                     
   .collect(collectingAndThen(toList(),
     list -> { return new TestingTeam(list.toArray(
       String[]::new)); }));
  logger.info(result.toString());
           
  return result;
  }
}

You can find these examples in the bundled code.

224. Observing and monitoring virtual threads

Observing and monitoring virtual threads can be done in several ways. First, we can use JFR (Java Flight Recorder) – we have introduced this tool in Chapter 6, Problem 136.

Using JFR

Among its reach list of events, JFR can monitor and record the following events related to virtual threads:

jdk.VirtualThreadStart – this event is recorded when a virtual thread starts (by default is disabled)

jdk.VirtualThreadEnd – this event is recorded when a virtual thread ends (by default is disabled)

jdk.VirtualThreadPinned – this event is recorded when a virtual thread was parked while pinned (by default is enabled with a threshold of 20ms).

jdk.VirtualThreadSubmitFailed – this event is recorded if a virtual thread cannot be started or unparked (by default is enabled)

You can find all the JFR events at https://sap.github.io/SapMachine/jfrevents/.We start configuring JFR for monitoring the virtual threads by adding in the root folder of the application the following vtEvent.jfc file:

<?xml version=”1.0″ encoding=”UTF-8″?>
<configuration version=”2.0″ description=”test”>
  <event name=”jdk.VirtualThreadStart”>
    <setting name=”enabled”>true</setting>
    <setting name=”stackTrace”>true</setting>
  </event>
  <event name=”jdk.VirtualThreadEnd”>
    <setting name=”enabled”>true</setting>
  </event>
  <event name=”jdk.VirtualThreadPinned”>
    <setting name=”enabled”>true</setting>
    <setting name=”stackTrace”>true</setting>
    <setting name=”threshold”>20 ms</setting>
  </event>
  <event name=”jdk.VirtualThreadSubmitFailed”>
    <setting name=”enabled”>true</setting>
    <setting name=”stackTrace”>true</setting>
  </event>
</configuration>

Next, let’s consider the following code (basically, this is the application from Problem 216):

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnSuccess scope
    = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    Stream.of(1, 2, 3)
      .<Callable<String>>map(id -> () -> fetchTester(id))
      .forEach(scope::fork);
    scope.join();
    String result = (String) scope.result();
    logger.info(result);
          
    return new TestingTeam(result);
  }
}

Next, we use -XX:StartFlightRecording=filename=recording.jfr to instruct JFR to record output in a file name recording.jfr, and we continue with settings=vtEvent.jfc to point out the configuration file listed previously.So, the final command is the one from this figure:

Figure 10.10 – Running JFR

JFR has produced a file named recording.jfr. We can easily view the content of this file via JFR CLI. The command (jfr print recording.jfr) will display the content of recording.jfr. The content is quite larger to be listed here (it contains three entries for jdk.VirtualThreadStart and three for jdk.VirtualThreadEnd) but here is the event specific to starting a virtual thread:

Figure 10.11 – JFR event for starting a virtual thread