Combining StructuredTaskScope and streams 3 – Concurrency – Virtual Threads, Structured Concurrency

Running 10000 tasks via fixed thread pool executor

Depending on your machine, the previous test may finish successfully or it may result in an OutOfMemoryError. We can avoid this unpleasant scenario by using a fixed thread pool. For instance, let’s limit the number of platform threads to 200 via the following snippet of code:

long start = System.currentTimeMillis();
       
try (ExecutorService executorFixed
    = Executors.newFixedThreadPool(200)) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorFixed.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 50190 ms (50 seconds) to run these 10000 tasks using at peak 216 platform threads. The following screenshot from JMX reveals this information:

Figure 10.15 – Running 10000 tasks via fixed thread pool executor

Obviously, a smaller number of platform threads is reflected in performance. If we put 216 workers to do the job of 7729 workers it will take longer for sure. Next, let’s see how virtual threads will handle this challenge.

Running 10000 tasks via virtual thread per task executor

This time, let’s see how the newVirtualThreadPerTaskExecutor() can handle these 10000 tasks. The code is straightforward:

long start = System.currentTimeMillis();
      
try (ExecutorService executorVirtual
      = Executors.newVirtualThreadPerTaskExecutor()) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorVirtual.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
      
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 3519 ms (3.5 seconds) to run these 10000 tasks using at peak 25 platform threads. The following screenshot from JMX reveals this information:

Figure 10.16 – Running 10000 tasks via virtual thread per task executor

Wooow! How cool is this?! The resulting time is far away the best in comparison with the previous tests and it uses fewer resources (only 25 platform threads). So, virtual threads really rock!I also strongly recommend you check out the following benchmark: https://github.com/colincachia/loom-benchmark/tree/main.Starting with JDK 21, the JMX’s HotSpotDiagnosticMXBean was enriched with the dumpThreads(String outputFile, ThreadDumpFormat format) method. This method outputs a thread dump to the given file (outputFile) in the given format (format). The thread dump will contain all platform threads but it may contain some or all virtual threads.In the following code, we attempt to obtain a thread dump for all subtasks (threads) of a StructuredTaskScope:

try (ShutdownOnSuccess scope
  = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
  Stream.of(1, 2, 3)
        .<Callable<String>>map(id -> () -> fetchTester(id))
        .forEach(scope::fork);
  HotSpotDiagnosticMXBean mBean = ManagementFactory
    .getPlatformMXBean(HotSpotDiagnosticMXBean.class);
  mBean.dumpThreads(Path.of(“dumpThreads.json”)
    .toAbsolutePath().toString(),                    
  HotSpotDiagnosticMXBean.ThreadDumpFormat.JSON);
  scope.join();
  String result = (String) scope.result();
  logger.info(result);
}

The output file is named threadDump.json and you can find it in the root folder of the application. The part of the output that we are interested in is partially listed here:


{
  “container”: “java.util.concurrent
             .StructuredTaskScope$ShutdownOnSuccess@6d311334″,
  “parent”: “<root>”,
  “owner”: “1”,
  “threads”: [
    {
    “tid”: “22”
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
       .run(VirtualThread.java:311)”
      ]
    },
    {
    “tid”: “24”,
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
        .run(VirtualThread.java:311)”
      ]
    },
    {
    “tid”: “25”,
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
        .run(VirtualThread.java:311)”
      ]
    }
  ],
  “threadCount”: “3”
}

As you can see, we have three virtual threads (#22, #24, and #25) that run subtasks of our scope. In the bundled code, you can find the complete output.

Summary

This chapter covered 16 introductory problems about virtual threads and structured concurrency. You can see this chapter as the preparation for the next one, which will cover more detailed aspects of these two topics.

Combining StructuredTaskScope and streams – Concurrency – Virtual Threads, Structured Concurrency

223. Combining StructuredTaskScope and streams

If you prefer functional programming then you’ll be happy to see that streams can be used with StructuredTaskScope as well. For instance, here we re-write the application from Problem 214 using a stream pipeline for forking our tasks:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnSuccess scope
      = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    Stream.of(1, 2, 3)
      .<Callable<String>>map(id -> () -> fetchTester(id))
      .forEach(scope::fork);
    scope.join();
    String result = (String) scope.result();
    logger.info(result);
         
    return new TestingTeam(result);
  }
}

Moreover, we can use stream pipelines to collect results and exceptions as follows:

public static TestingTeam buildTestingTeam()
    throws InterruptedException, ExecutionException {
 try (ShutdownOnSuccess scope
  = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
  List<Subtask> subtasks = Stream.of(Integer.MAX_VALUE, 2, 3)
   .<Callable<String>>map(id -> () -> fetchTester(id))
   .map(scope::fork)
   .toList();
  scope.join();
  List<Throwable> failed = subtasks.stream()
   .filter(f -> f.state() == Subtask.State.FAILED)
   .map(Subtask::exception)
   .toList();
  logger.info(failed.toString());
  TestingTeam result = subtasks.stream()
   .filter(f -> f.state() == Subtask.State.SUCCESS)
   .map(Subtask::get)                                     
   .collect(collectingAndThen(toList(),
     list -> { return new TestingTeam(list.toArray(
       String[]::new)); }));
  logger.info(result.toString());
           
  return result;
  }
}

You can find these examples in the bundled code.

224. Observing and monitoring virtual threads

Observing and monitoring virtual threads can be done in several ways. First, we can use JFR (Java Flight Recorder) – we have introduced this tool in Chapter 6, Problem 136.

Using JFR

Among its reach list of events, JFR can monitor and record the following events related to virtual threads:

jdk.VirtualThreadStart – this event is recorded when a virtual thread starts (by default is disabled)

jdk.VirtualThreadEnd – this event is recorded when a virtual thread ends (by default is disabled)

jdk.VirtualThreadPinned – this event is recorded when a virtual thread was parked while pinned (by default is enabled with a threshold of 20ms).

jdk.VirtualThreadSubmitFailed – this event is recorded if a virtual thread cannot be started or unparked (by default is enabled)

You can find all the JFR events at https://sap.github.io/SapMachine/jfrevents/.We start configuring JFR for monitoring the virtual threads by adding in the root folder of the application the following vtEvent.jfc file:

<?xml version=”1.0″ encoding=”UTF-8″?>
<configuration version=”2.0″ description=”test”>
  <event name=”jdk.VirtualThreadStart”>
    <setting name=”enabled”>true</setting>
    <setting name=”stackTrace”>true</setting>
  </event>
  <event name=”jdk.VirtualThreadEnd”>
    <setting name=”enabled”>true</setting>
  </event>
  <event name=”jdk.VirtualThreadPinned”>
    <setting name=”enabled”>true</setting>
    <setting name=”stackTrace”>true</setting>
    <setting name=”threshold”>20 ms</setting>
  </event>
  <event name=”jdk.VirtualThreadSubmitFailed”>
    <setting name=”enabled”>true</setting>
    <setting name=”stackTrace”>true</setting>
  </event>
</configuration>

Next, let’s consider the following code (basically, this is the application from Problem 216):

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnSuccess scope
    = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    Stream.of(1, 2, 3)
      .<Callable<String>>map(id -> () -> fetchTester(id))
      .forEach(scope::fork);
    scope.join();
    String result = (String) scope.result();
    logger.info(result);
          
    return new TestingTeam(result);
  }
}

Next, we use -XX:StartFlightRecording=filename=recording.jfr to instruct JFR to record output in a file name recording.jfr, and we continue with settings=vtEvent.jfc to point out the configuration file listed previously.So, the final command is the one from this figure:

Figure 10.10 – Running JFR

JFR has produced a file named recording.jfr. We can easily view the content of this file via JFR CLI. The command (jfr print recording.jfr) will display the content of recording.jfr. The content is quite larger to be listed here (it contains three entries for jdk.VirtualThreadStart and three for jdk.VirtualThreadEnd) but here is the event specific to starting a virtual thread:

Figure 10.11 – JFR event for starting a virtual thread