Combining StructuredTaskScope and streams – Concurrency – Virtual Threads, Structured Concurrency
223. Combining StructuredTaskScope and streams
If you prefer functional programming then you’ll be happy to see that streams can be used with StructuredTaskScope as well. For instance, here we re-write the application from Problem 214 using a stream pipeline for forking our tasks:
public static TestingTeam buildTestingTeam()
throws InterruptedException, ExecutionException {
try (ShutdownOnSuccess scope
= new StructuredTaskScope.ShutdownOnSuccess<String>()) {
Stream.of(1, 2, 3)
.<Callable<String>>map(id -> () -> fetchTester(id))
.forEach(scope::fork);
scope.join();
String result = (String) scope.result();
logger.info(result);
return new TestingTeam(result);
}
}
Moreover, we can use stream pipelines to collect results and exceptions as follows:
public static TestingTeam buildTestingTeam()
throws InterruptedException, ExecutionException {
try (ShutdownOnSuccess scope
= new StructuredTaskScope.ShutdownOnSuccess<String>()) {
List<Subtask> subtasks = Stream.of(Integer.MAX_VALUE, 2, 3)
.<Callable<String>>map(id -> () -> fetchTester(id))
.map(scope::fork)
.toList();
scope.join();
List<Throwable> failed = subtasks.stream()
.filter(f -> f.state() == Subtask.State.FAILED)
.map(Subtask::exception)
.toList();
logger.info(failed.toString());
TestingTeam result = subtasks.stream()
.filter(f -> f.state() == Subtask.State.SUCCESS)
.map(Subtask::get)
.collect(collectingAndThen(toList(),
list -> { return new TestingTeam(list.toArray(
String[]::new)); }));
logger.info(result.toString());
return result;
}
}
You can find these examples in the bundled code.
224. Observing and monitoring virtual threads
Observing and monitoring virtual threads can be done in several ways. First, we can use JFR (Java Flight Recorder) – we have introduced this tool in Chapter 6, Problem 136.
Using JFR
Among its reach list of events, JFR can monitor and record the following events related to virtual threads:
jdk.VirtualThreadStart – this event is recorded when a virtual thread starts (by default is disabled)
jdk.VirtualThreadEnd – this event is recorded when a virtual thread ends (by default is disabled)
jdk.VirtualThreadPinned – this event is recorded when a virtual thread was parked while pinned (by default is enabled with a threshold of 20ms).
jdk.VirtualThreadSubmitFailed – this event is recorded if a virtual thread cannot be started or unparked (by default is enabled)
You can find all the JFR events at https://sap.github.io/SapMachine/jfrevents/.We start configuring JFR for monitoring the virtual threads by adding in the root folder of the application the following vtEvent.jfc file:
<?xml version=”1.0″ encoding=”UTF-8″?>
<configuration version=”2.0″ description=”test”>
<event name=”jdk.VirtualThreadStart”>
<setting name=”enabled”>true</setting>
<setting name=”stackTrace”>true</setting>
</event>
<event name=”jdk.VirtualThreadEnd”>
<setting name=”enabled”>true</setting>
</event>
<event name=”jdk.VirtualThreadPinned”>
<setting name=”enabled”>true</setting>
<setting name=”stackTrace”>true</setting>
<setting name=”threshold”>20 ms</setting>
</event>
<event name=”jdk.VirtualThreadSubmitFailed”>
<setting name=”enabled”>true</setting>
<setting name=”stackTrace”>true</setting>
</event>
</configuration>
Next, let’s consider the following code (basically, this is the application from Problem 216):
public static TestingTeam buildTestingTeam()
throws InterruptedException, ExecutionException {
try (ShutdownOnSuccess scope
= new StructuredTaskScope.ShutdownOnSuccess<String>()) {
Stream.of(1, 2, 3)
.<Callable<String>>map(id -> () -> fetchTester(id))
.forEach(scope::fork);
scope.join();
String result = (String) scope.result();
logger.info(result);
return new TestingTeam(result);
}
}
Next, we use -XX:StartFlightRecording=filename=recording.jfr to instruct JFR to record output in a file name recording.jfr, and we continue with settings=vtEvent.jfc to point out the configuration file listed previously.So, the final command is the one from this figure:

Figure 10.10 – Running JFR
JFR has produced a file named recording.jfr. We can easily view the content of this file via JFR CLI. The command (jfr print recording.jfr) will display the content of recording.jfr. The content is quite larger to be listed here (it contains three entries for jdk.VirtualThreadStart and three for jdk.VirtualThreadEnd) but here is the event specific to starting a virtual thread:

Figure 10.11 – JFR event for starting a virtual thread