Introducing structured concurrency 2 – Concurrency – Virtual Threads, Structured Concurrency

We skip the rest of the code since you can find it in the bundled code.Of course, we can implement code-answers to each of these questions via error handling, tasks abandon and abortion, ExecutorService, and so on, but this means a lot of work for the developer. Writing failsafe solutions that carefully cover all possible scenarios across multiple tasks/subtasks while tracking their progress in a concurrent environment is not an easy job. Not to mention how hard is to understand and maintain the resulting code by another developer or even the same developer after 1-2 years or even months.It is time to add some structure to this code, so let’s introduce structured concurrency (or, Project Loom).Structured concurrency relies on several pillars meant to bring lightweight concurrency in Java. The fundamental pillar or principle of structured concurrency is highlighted next.

The fundamental principle of structured concurrency: When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block.

Based on this principle, the thread that initiates a concurrent context is the parent-thread or the owner-thread. All threads started by the parent-thread are children-threads or forks, so between them, these threads are siblings. Together, the parent-thread and the children-threads, define a parent-child hierarchy.Putting the structured concurrency principle in a diagram can be seen as follows:

Figure 10.2 – Parent-child hierarchy in structured concurrency

In the context of parent-child hierarchy, we have support for error/exception handling with short-circuiting, cancellation propagation, and monitoring/observability.Error/exception handling with short-circuiting: If a child-thread fails then all child-threads are canceled unless they are complete. For instance, if futureTester(1) fails, then futureTester(2) and futureTester(3) are automatically canceled.Cancellation propagation: If the parent-thread is interrupted until joining the child-threads is over then these forks (the child-threads/subtasks) are canceled automatically. For instance, if the thread executing buildTestingTeam() gets interrupted then its three forks are automatically canceled.Monitoring/observability: A thread dump reveals crystal-clear the entire parent-child hierarchy no matter how many levels have been spawned. Moreover, in structured concurrency, we take advantage of scheduling and memory management of threads.While these are purely concepts, writing code that respects and follows these concepts requires the proper API and the following awesome callout:

Figure 10.3 – Don’t reuse virtual threads

Cut this out and stick it somewhere to see it every day! So, in structured concurrency, don’t reuse virtual threads. I know what you are thinking: hey dude, threads are expensive and limited, so we have to reuse them. A quick hint: we are talking about virtual threads (massive throughput), not classical threads, but this topic is covered in the next problem.

Introducing structured concurrency – Concurrency – Virtual Threads, Structured Concurrency

210. Introducing structured concurrency

If you are as old as I am then most probably you’ve started programming with a language such as BASIC or a similar unstructured programming language. At that time, an application was just a sequence of lines that defined a sequential logic/behavior via a bunch of GOTO statements that drive the flow by jumping as a kangaroo back and forward between the code lines. Well, in Java, the building blocks of a typical concurrent code are so primitive that the code looks somehow like unstructured programming because is hard to follow and understand what’s going on. Moreover, a thread dump of a concurrent task doesn’t provide the needed answers.Let’s follow a snippet of Java concurrent code and let’s stop every time we have a question (always check the code below the question). The task is to concurrently load three testers by id and team them up in a testing team. First, let’s list here the server code (we will use this simple code to serve us in this problem and subsequent problems):

public static String fetchTester(int id)
      throws IOException, InterruptedException {
  HttpClient client = HttpClient.newHttpClient();
  HttpRequest requestGet = HttpRequest.newBuilder()
    .GET()
    .uri(URI.create(“https://reqres.in/api/users/” + id))
    .build();
  HttpResponse<String> responseGet = client.send(
    requestGet, HttpResponse.BodyHandlers.ofString());
  if (responseGet.statusCode() == 200) {
    return responseGet.body();
  }
  throw new UserNotFoundException(“Code: “
    + responseGet.statusCode());
}

Next, the code that we are especially interested in starts as follows:

private static final ExecutorService executor
  = Executors.newFixedThreadPool(2);
public static TestingTeam buildTestingTeam()
   throws InterruptedException {
  …

First stop: As you can see buildTestingTeam() throws an InterruptedException. So, if the thread executing buildTestingTeam() gets interrupted then how can we easily interrupt the following threads:

  Future<String> future1 = futureTester(1);
  Future<String> future2 = futureTester(2);
  Future<String> future3 = futureTester(3);
 
  try {
    …

Second stop: Here we have three get() calls. So, the current thread waits for other threads to complete. Can we easily observe those threads?

    String tester1 = future1.get();
    String tester2 = future2.get();
    String tester3 = future3.get();
    logger.info(tester1);
    logger.info(tester2);
    logger.info(tester3);
    return new TestingTeam(tester1, tester2, tester3);
  } catch (ExecutionException ex) {
    …

Third stop: If an ExecutionException is caught then we know that one of these three Future has failed. Can we easily cancel the remaining two or they will just hang on there? Probably future1 will fail while future2 and future3 will complete successfully or maybe future2 will complete successfully while future3 will just run forever (a so-called orphan thread). This may lead to serious mismatches in the expected results, memory leaks, and so on.

    throw new RuntimeException(ex);
  } finally {
    …

Fourth stop: Next line of code is used to shut down the executor, but is so easy to overlook it. Is this the proper place to do this?
    shutdownExecutor(executor);
  }
}

Fifth stop: If you didn’t spot the previous line of code then is legitimate to ask yourself how/where this executor get shut down.

public static Future<String> futureTester(int id) {
  return executor.submit(() -> fetchTester(id));
}

Combining StructuredTaskScope and streams 2 – Concurrency – Virtual Threads, Structured Concurrency

And, in the next figure, you can see the event recorded for ending this virtual thread:

Figure 10.12 – JFR event for ending a virtual thread

Besides the JFR CLI, you can use more powerful tools for consuming the virtual thread events such as JDK Mission Control (https://www.oracle.com/java/technologies/jdk-mission-control.html) and the well-known Advanced Management Console (https://www.oracle.com/java/technologies/advancedmanagementconsole.html).For getting a stack trace for threads that block while pinned we can set the system property jdk.tracePinnedThreads. A complete (verbose) stack trace is available via -Djdk.tracePinnedThreads=full. Or if all you need is a brief/short stack trace then rely on -Djdk.tracePinnedThreads=short.In our example, we can easily get a pinned virtual thread by marking the fetchTester() method as synchronized (remember that a virtual thread cannot be unmounted if it runs code inside a synchronized method/block):

public static synchronized String fetchTester(int id)
    throws IOException, InterruptedException {
  …
}

In this context, JFR will record a pinned virtual thread as in the following figure:

Figure 10.13 – JFR event for a pinned virtual thread

If we run the application with -Djdk.tracePinnedThreads=full then your IDE will print a detailed stack trace that starts as follows:

Thread[#26,ForkJoinPool-1-worker-1,5,CarrierThreads]    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)

You can see the complete output by executing the bundled code. Of course, you can get a thread dump and analyze it via several other tools. You may prefer any of jstack, Java Mission Control (JMC), jvisualvm, or jcmd. For instance, we can obtain a thread dump in plain text or JSON format via jcmd as follows:

jcmd <PID> Thread.dump_to_file -format=text <file>
jcmd <PID> Thread.dump_to_file -format=json <file>

Next, let’s play with jconsole (JMX) to quickly analyze the performance of virtual threads.

Using Java Management Extensions (JMX)

Until JDK 20 (inclusive), JMX provide support for monitoring only the platform and threads. But, we can still use JMX to observe the performance brought by virtual threads in comparison with platform threads.For instance, we can use JMX to monitor platform threads at each 500 ms via the following snippet of code:

ScheduledExecutorService scheduledExecutor
      = Executors.newScheduledThreadPool(1);
scheduledExecutor.scheduleAtFixedRate(() -> {
  ThreadMXBean threadBean
    = ManagementFactory.getThreadMXBean();
  ThreadInfo[] threadInfo
    = threadBean.dumpAllThreads(false, false);
  logger.info(() -> “Platform threads: ” + threadInfo.length);
}, 500, 500, TimeUnit.MILLISECONDS);

We rely on this code in the following three scenarios.

Running 10000 tasks via cached thread pool executor

Next, let’s add a snippet of code that run 10000 tasks via newCachedThreadPool() and platform threads. We also measure the time elapsed to execute these tasks:

long start = System.currentTimeMillis();
      
try (ExecutorService executorCached
    = Executors.newCachedThreadPool()) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorCached.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
      
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 8147 ms (8 seconds) to run these 10000 tasks using at peak 7729 platform threads. The following screenshot from jconsole (JMX) reveals this information:

Figure 10.14 – Running 10000 tasks via cached thred pool executor

Next, let’s repeat this test via a fixed thread pool.

Introducing virtual threads 3 – Concurrency – Virtual Threads, Structured Concurrency

Waiting for a virtual task to terminate

The given task is executed by a virtual thread, while the main thread is not blocked. In order to wait for the virtual thread to terminate we have to call one of the join() flavors. We have join() without arguments that waits indefinitely, and a few flavors that wait for a given time (for instance, join(Duration duration), join(long millis)):

vThread.join();

These methods throw an InterruptedException so you have to catch it and handle it or just throw it. Now, because of join(), the main thread cannot terminate before the virtual thread. It has to wait until the virtual thread completes.

Creating an unstarted virtual thread

Creating an unstarted virtual thread can be done via unstarted(Runnable task) as follows:

Thread vThread = Thread.ofVirtual().unstarted(task);

Or, via Thread.Builder as follows:

Thread.Builder builder = Thread.ofVirtual();
Thread vThread = builder.unstarted(task);

This time, the thread is not scheduled for execution. It will be scheduled for execution only after we explicitly call the start() method:

vThread.start();

We can check if a thread is alive (it was started but not terminated) via the isAlive() method:

boolean isalive = vThread.isAlive();

The unstarted() method is available for platform threads as well (there is also the Thread.Builder.OfPlatform subinterface):

Thread pThread = Thread.ofPlatform().unstarted(task);

We can start pThread by calling the start() method.

Creating a ThreadFactory for virtual threads

You can create a ThreadFactory of virtual threads as follows:

ThreadFactory tfVirtual = Thread.ofVirtual().factory();
ThreadFactory tfVirtual = Thread.ofVirtual()
  .name(“vt-“, 0).factory(); // ‘vt-‘ name prefix, 0 counter

Or, via Thread.Builder as follows:

Thread.Builder builder = Thread.ofVirtual().name(“vt-“, 0);
ThreadFactory tfVirtual = builder.factory();

And, a ThreadFactory for platform threads as follows (you can use Thread.Builder as well):

ThreadFactory tfPlatform = Thread.ofPlatform()
  .name(“pt-“, 0).factory();// ‘pt-‘ name prefix, 0 counter

Or, a ThreadFactory that we can use to switch between virtual/platform threads as follows:

static class SimpleThreadFactory implements ThreadFactory {
  @Override
  public Thread newThread(Runnable r) {
    // return new Thread(r);                // platform thread
    return Thread.ofVirtual().unstarted(r); // virtual thread
  }
}

Next, we can use any of these factories via the ThreadFactory#newThread(Runnable task) as follows:

tfVirtual.newThread(task).start();
tfPlatform.newThread(task).start();
SimpleThreadFactory stf = new SimpleThreadFactory();
stf.newThread(task).start();

If the thread factory starts the created thread as well then there is no need to explicitly call the start() method.

Checking a virtual thread details

Moreover, we can check if a certain thread is a platform thread or a virtual thread via isVirtual():

Thread vThread = Thread.ofVirtual()
  .name(“my_vThread”).unstarted(task);
Thread pThread1 = Thread.ofPlatform()
  .name(“my_pThread”).unstarted(task);
Thread pThread2 = new Thread(() -> {});
logger.info(() -> “Is vThread virtual ? “
  + vThread.isVirtual());  // true
logger.info(() -> “Is pThread1 virtual ? “
  + pThread1.isVirtual()); // false
logger.info(() -> “Is pThread2 virtual ? “
  + pThread2.isVirtual()); // false

Obviously, only vThread is a virtual thread.A virtual thread runs always as a daemon thread. The isDaemon() method returns true, and trying to call setDaemon(false) will throw an exception.The priority of a virtual thread is always  NORM_PRIORITY (calling getPriority() always return 5 – constant int for NORM_PRIORITY). Calling setPriority() with a different value has no effect.A virtual thread cannot be part of a thread group because it already belongs to the VirtualThreads group. Calling getThreadGroup().getName() returns VirtualThreads.A virtual thread has no permission with Security Manager (which is deprecated anyway).

Introducing virtual threads 2 – Concurrency – Virtual Threads, Structured Concurrency

What are virtual threads?

Virtual threads have been introduced in JDK 19 as a preview (JEP 425) and become a final feature in JDK 21 (JEP 444). Virtual threads run on top of platform threads in a one-to-many relationship, while the platform threads run on top of OS threads in a one-to-one relationship as in the following figure:

Figure 10.6 – Virtual threads architecture

If we resume this figure in a few words then we can say that JDK maps a large number of virtual threads to a small number of OS threads.Before creating a virtual thread let’s release two important notes that will help us to quickly understand the fundamentals of virtual threads. First, let’s have a quick note about the virtual thread’s memory footprint.

Virtual threads are not wrappers of OS threads. They are lightweight Java entities (they have their own stack memory with a small footprint – only a few hundred bytes) that are cheap to create, block, and destroy (creating a virtual thread is around 1000 times cheaper than creating a classical Java thread). They can be really many of them at the same time (millions) so they sustain a massive throughput. Virtual threads should not be reused (they are disposable) or pooled.

So, when we talk about virtual threads that are more things that we should unlearn than the things that we should learn. But, where are virtual threads stored and who’s responsible to schedule them accordingly?

Virtual threads are stored in the JVM heap (so, they take advantage of Garbage Collector) instead of the OS stack. Moreover, virtual threads are scheduled by the JVM via a work-stealing ForkJoinPool scheduler. Practically, JVM schedules and orchestrates virtual threads to run on platform threads in such a way that a platform thread executes only one virtual thread at a time.

Next, let’s create a virtual thread.

Creating a virtual thread

From the API perspective, a virtual thread is another flavor of java.lang.Thread. If we dig a little bit via getClass(), we see that a virtual thread class is java.lang.VirtualThread which is a final non-public class that extends the BaseVirtualThread class which is a sealed abstract class that extends java.lang.Thread:

final class VirtualThread extends BaseVirtualThread {…}
sealed abstract class BaseVirtualThread extends Thread
  permits VirtualThread, ThreadBuilders.BoundVirtualThread {…}

Let’s consider that we have the following task (Runnable):

Runnable task = () -> logger.info(
  Thread.currentThread().toString());

Creating and starting a virtual thread

We can create and start a virtual thread for our task via the startVirtualThread(Runnable task) method as follows:

Thread vThread = Thread.startVirtualThread(task);
// next you can set its name
vThread.setName(“my_vThread”);

The returned vThread is scheduled for execution by the JVM itself. But, we can also create and start a virtual thread via Thread.ofVirtual() which returns OfVirtual (sealed interface introduced in JDK 19) as follows:

Thread vThread = Thread.ofVirtual().start(task);
// a named virtual thread
Thread.ofVirtual().name(“my_vThread”).start(task);

Now, vThread will solve our task.Moreover, we have the Thread.Builder interface (and Thread.Builder.OfVirtual subinterface) that can be used to create a virtual thread as follows:

Thread.Builder builder
  = Thread.ofVirtual().name(“my_vThread”);
Thread vThread = builder.start(task);

Here is another example of creating two virtual threads via Thread.Builder:

Thread.Builder builder
  = Thread.ofVirtual().name(“vThread-“, 1);
// name “vThread-1”
Thread vThread1 = builder.start(task);
vThread1.join();
logger.info(() -> vThread1.getName() + ” terminated”);
// name “vThread-2”
Thread vThread2 = builder.start(task);
vThread2.join();
logger.info(() -> vThread2.getName() + ” terminated”);

Check out these examples in the bundled code.

Introducing virtual threads – Concurrency – Virtual Threads, Structured Concurrency

211. Introducing virtual threads

Java allows us to write multithreaded applications via the java.lang.Thread class. These are classical Java threads that are basically just thin wrappers of OS (kernel) threads. As you’ll see, these classical Java threads are referred to as platform threads and they are available for quite a lot of time (from JDK 1.1, as the following diagram reveals):

Figure 10.4 – JDK multithreading evolution

Next, let’s hit the road to JDK 19 virtual threads.

What’s the problem with platform (OS) threads?

OS threads are expensive in every single way, or more clearly, they are costly in time and space. Creating OS threads is a costly operation that requires a lot of stack space (around 20 megabytes) for storing their context, Java call stacks, and additional resources. Moreover, the OS thread scheduler is responsible to schedule Java threads and this is another costly operation that requires moving around a significant amount of data. This is referred to as thread context switching and it requires a lot of resources to take place.In the following figure, you can see the one-to-one relationship between a Java thread and an OS thread:

Figure 10.5 – JVM to OS threads

For decades, our multithreaded application runs in this context. This long time and experience taught us that we can create a limited number of Java threads (because of low throughput) and that we should reuse them wisely. The number of Java threads is a limiting factor that usually is exhausted before other resources such as network connections, CPU, and so on. Java doesn’t make any difference between threads that perform intensive-computational tasks (so, threads that are really exploiting the CPU) or they just wait for data (they just hang on the CPU).Let’s have a quick exercise. Let’s assume that our machine has 8 GB of memory and a single Java thread needs 20 MB. This means that we can have room for around 400 Java threads (8 GB = 8000 MB / 20 MB = 400 threads). Next, let’s assume that these threads perform I/O operations over a network. Each such I/O operation needs around 100 ms to complete, while the request preparation and response processing needs around 500 ns. So, a thread work for 1000 ns (0.001 ms) and just waits for 100 ms (100,000,000 ns) for the I/O operation to complete. This means that at 8 GB of memory, the 400 threads will use 0.4% of CPU, under 1% which is very low. We can conclude that a thread is idle for 99.99% of the time.Based on this exercise, it is quite obvious that Java threads become a bottleneck in throughput that doesn’t allow us to solicit hardware at full capacity. Of course, we can sweeten the situation a little bit by using thread pools for minimizing the costs but it still does not solve the major issues of dealing with resources. You have to go for CompletableFuture, reactive programming (for instance, Spring Mono and Flux) and so on.But, how many classical Java threads we can create? We can easily find out by running a simple snippet of code as follows:

AtomicLong counterOSThreads = new AtomicLong();
      
while (true) {
  new Thread(() -> {
    long currentOSThreadNr
      = counterOSThreads.incrementAndGet();
    System.out.println(“Thread: ” + currentOSThreadNr);              
    LockSupport.park();              
  }).start();
}

Or, if we want to taste from the new concurrent API, we can call the new Thread.ofPlatform() method as follows (OfPlatform is a sealed interface introduced in JDK 19):

AtomicLong counterOSThreads = new AtomicLong();
while (true) {
  Thread.ofPlatform().start(() -> {
    long currentOSThreadNr
      = counterOSThreads.incrementAndGet();
    System.out.println(“Thread: ” + currentOSThreadNr);              
    LockSupport.park();              
  });
}

On my machine, I got an OutOfMemoryError after around 40,000 Java threads. Depending on your OS and hardware this number may vary.The Thread.ofPlatform() method was added in JDK 19 to easily distinguish between Java threads (classical Java threads as we know them for decades – thin wrappers of OS threads) and the new kids in town, the virtual threads.

Using the ExecutorService for virtual threads – Concurrency – Virtual Threads, Structured Concurrency

212. Using the ExecutorService for virtual threads

Virtual threads allow us to write more expressive and straightforward concurrent code. Thanks to the massive throughput obtained via virtual threads we can easily adopt the task-per-thread model (for an HTTP server, this means a request per thread, for a database, this means a transaction per thread, and so on). In other words, we can assign a new virtual thread for each concurrent task. Trying to use the task-per-thread model with platform threads will result in a throughput limited by the number of hardware’s cores – this is explained by Little’s law (https://en.wikipedia.org/wiki/Little%27s_law), L = λW, or throughput equals average concurrency divided by latency.Whenever possible it is recommended to avoid interacting with threads directly. JDK sustains this via ExecutorService/Executor API. More precisely, we are used to submitting a task (Runnable/Callable) to an ExecutorService/Executor and working with the returned Future. This pattern is valid for virtual threads as well.So, we don’t have to write ourselves all the plumbing code for adopting the task-per-thread for virtual threads because, starting with JDK 19, this model is available via the Executors class. More precisely, via the newVirtualThreadPerTaskExecutor() method which creates an ExecutorService capable to create an unbounded number of virtual threads that follows the task-per-thread model. This ExecutorService exposes methods that allow us to give the tasks such as the submit() (as you’ll see next) and invokeAll/Any() (as you’ll see later) methods, and return a Future containing an exception or a result.

Starting with JDK 19, the ExecutorService extends the AutoCloseable interface. In other words, we can use ExecutorService in a try-with-resources pattern.

Consider the following simple Runnable and Callable:

Runnable taskr = () -> logger.info(
  Thread.currentThread().toString());
      
Callable<Boolean> taskc = () -> {
  logger.info(Thread.currentThread().toString());
  return true;
};

Executing the Runnable/Callable can be done as follows (here, we submit 15 tasks (NUMBER_OF_TASKS = 15)):

try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
  for (int i = 0; i < NUMBER_OF_TASKS; i++) {
    executor.submit(taskr); // executing Runnable
    executor.submit(taskc); // executing Callable
  }
}

Of course, in the case of Runnable/Callable we can capture a Future and act accordingly via the blocking get() method or whatever we want to do.

Future<?> future = executor.submit(taskr);
Future<Boolean> future = executor.submit(taskc);

A possible output looks as follows:

VirtualThread[#28]/runnable@ForkJoinPool-1-worker-6
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#36]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#37]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#35]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#34]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#33]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1

Check out the virtual threads’ ids. They range between #22-#37 without repetition. Each task is executed by its own virtual thread.The task-per-thread model is also available for classical threads via newThreadPerTaskExecutor(ThreadFactory threadFactory). Here is an example:

static class SimpleThreadFactory implements ThreadFactory {
  @Override
  public Thread newThread(Runnable r) {
    return new Thread(r);                     // classic
   // return Thread.ofVirtual().unstarted(r); // virtual
  }
}
try (ExecutorService executor =
      Executors.newThreadPerTaskExecutor(
          new SimpleThreadFactory())) {
  for (int i = 0; i < NUMBER_OF_TASKS; i++) {
    executor.submit(taskr); // executing Runnable
    executor.submit(taskc); // executing Callable
  }
}

As you can see, newThreadPerTaskExecutor() can be used for classic threads or for virtual threads. The number of created threads is unbounded. By simply modifying the thread factory we can switch between virtual/classic threads.A possible output looks as follows:

Thread[#75,Thread-15,5,main]
Thread[#77,Thread-17,5,main]
Thread[#76,Thread-16,5,main]
Thread[#83,Thread-23,5,main]
Thread[#82,Thread-22,5,main]
Thread[#80,Thread-20,5,main]
Thread[#81,Thread-21,5,main]
Thread[#79,Thread-19,5,main]
Thread[#78,Thread-18,5,main]
Thread[#89,Thread-29,5,main]
Thread[#88,Thread-28,5,main]
Thread[#87,Thread-27,5,main]
Thread[#86,Thread-26,5,main]
Thread[#85,Thread-25,5,main]
Thread[#84,Thread-24,5,main]

Check out the threads’ ids. They range between #75-#89 without repetition. Each task is executed by its own thread.

Explaining how virtual threads work 2 – Concurrency – Virtual Threads, Structured Concurrency

Capturing virtual threads

So far we learned that a virtual thread is mounted by JVM to a platform thread which becomes its carrier thread. Moreover, the carrier thread runs the virtual thread until it hit a blocking (I/O) operation. At that point, the virtual thread is unmounted from the carrier thread and it will be rescheduled after the blocking (I/O) operation is done.While this scenario is true for most of the blocking operations resulting in unmounting the virtual threads and freeing the platform thread (and the underlying OS thread), there are a few exceptional cases when the virtual threads are not unmounted. There are two main causes for this behavior:

Limitations on OS (for instance, a significant number of filesystem operations)

Limitations on JDK (for instance, Object#wait())

When the virtual thread cannot be unmounted from its carrier thread it means that the carrier thread and the underlying OS thread are blocked. This may affect the scalability of the application, so, if the platform threads pool allows it, JVM can take the decision of adding one more platform thread. So, for a period of time, the number of platform threads may exceed the number of available cores.

Pinning virtual threads

There are also two other use cases when a virtual thread cannot be unmounted:

When the virtual thread runs code inside a synchronized method/block

When the virtual thread invokes a foreign function or native method (topic covered in Chapter 7)

In this scenario, we say that the virtual thread is pinned to the carrier thread. This may affect the scalability of the application, but JVM will not increase the number of platform threads. Instead of this, we should take action and refactor the synchronized blocks to ensure that the locking code is simple, clear, and short. Whenever possible prefer java.util.concurrent locks instead of synchronized blocks. If we manage to avoid long and frequent locking periods then we will not face any significant scalability issues. In future releases, the JDK team aims to eliminate the pinning inside synchronized blocks.

Explaining how virtual threads work – Concurrency – Virtual Threads, Structured Concurrency

213. Explaining how virtual threads work

Now that we know how to create and start a virtual thread, let’s see how it actually works.Let’s start with a meaningful diagram:

Figure 10.7 – How virtual threads works

As you can see, figure 10.7 is similar to 10.6 only that we have added a few more elements.First of all, notice that the platform threads run under a ForkJoinPool umbrella. This is a First-In-First-Out (FIFO) dedicated fork/join pool dedicated to scheduling and orchestrating the relationships between virtual threads and platform threads (a detailed coverage of Java fork/join framework is available in Java Coding Problems, First Edition, Chapter 11).

This dedicated ForkJoinPool is controlled by the JVM and it acts as the virtual thread scheduler based on a FIFO queue. Its initial capacity (number of threads) is equal to the number of available cores and it can be increased up to 256. The default virtual thread scheduler is implemented in the java.lang.VirtualThread class:

private static ForkJoinPool createDefaultScheduler() {…}

Do not confuse this ForkJoinPool with the one used for parallel streams (Common Fork Join Pool – ForkJoinPool.commonPool()).

Between the virtual threads and the platform threads, there is a one-to-many association. Nevertheless, the JVM schedules virtual threads to run on platform threads in such a way that only one virtual thread run on a platform thread at a time. When the JVM assigns a virtual thread to a platform thread, the so-called stack chunk object of the virtual thread is copied from the heap memory on the platform thread. If the code running on a virtual thread encounters a blocking (I/O) operation that should be handled by the JVM then the virtual thread is released by copying its stack chunk object back into the heap (this operation of copying the stack chunk between the heap memory and platform thread is the cost of blocking a virtual thread – this is much cheaper than blocking a platform thread). Meanwhile, the platform thread can run other virtual threads. When the blocking (I/O) of the released virtual thread is done, JVM rescheduled the virtual thread for execution on a platform thread. This can be the same platform thread or another one.

The operation of assigning a virtual thread to a platform thread is called mounting. The operation of unassigning a virtual thread from the platform thread is called unmounting. The platform thread running the assigned virtual thread is called a carrier thread.

Let’s have an example that reveals how the virtual threads are mounted:

private static final int NUMBER_OF_TASKS
  = Runtime.getRuntime().availableProcessors();
Runnable taskr = () ->
  logger.info(Thread.currentThread().toString());      
try (ExecutorService executor
     = Executors.newVirtualThreadPerTaskExecutor()) {
  for (int i = 0; i < NUMBER_OF_TASKS + 1; i++) {
    executor.submit(taskr);
  }
}

In this snippet of code, we create a number of virtual threads equal to the number of available cores + 1. On my machine, I have 8 cores (so, 8 carriers) and each of them carries a virtual thread. Since we have + 1, a carrier will work twice. The output reveals this scenario (check out the workers, here worker-8 run virtual threads #30 and #31):

VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-8
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-6
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-8

But, we can configure the ForkJoinPool via three system properties as follows:

jdk.virtualThreadScheduler.parallelism – number of CPU cores

jdk.virtualThreadScheduler.maxPoolSize – maximum pool size (256)

jdk.virtualThreadScheduler.minRunnable – minimum number of running threads (half the pool size)

In a subsequent problem, we will use these properties to better shape virtual thread context switching (mounting/unmounting) details.

Hooking virtual threads and sync code – Concurrency – Virtual Threads, Structured Concurrency

214. Hooking virtual threads and sync code

The goal of this problem is to highlight how virtual threads interact with synchronous code. For this, we use the built-in java.util.concurrent.SynchronousQueue. This is a built-in blocking queue that allows only one thread to operate at a time. More precisely, a thread that wants to insert an element in this queue is blocked until another thread attempts to remove an element from it and vice versa. Basically, a thread cannot insert an element unless another thread attempts to remove an element.Let’s assume that a virtual thread attempts to insert in a SynchronousQueue, while a platform thread attempts to remove from this queue. In code lines, we have:

SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable task = () -> {
  logger.info(() -> Thread.currentThread().toString()
    + ” sleeps for 5 seconds”);
  try { Thread.sleep(Duration.ofSeconds(5)); }
    catch (InterruptedException ex) {}
  logger.info(() -> “Running “
    + Thread.currentThread().toString());
  queue.add(Integer.MAX_VALUE);
};
logger.info(“Before running the task …”);
Thread vThread = Thread.ofVirtual().start(task);
logger.info(vThread.toString());

So, the virtual thread (vThread) waits for 5 seconds before attempting to insert an element in the queue. However, it will not successfully insert an element until another thread attempts to remove an element from this queue:

logger.info(() -> Thread.currentThread().toString()
  + ” can’t take from the queue yet”);
int maxint = queue.take();                      
logger.info(() -> Thread.currentThread().toString()
  + “took from queue: ” + maxint);              
      
logger.info(vThread.toString());
logger.info(“After running the task …”);

Here, the Thread.currentThread() refers to the main thread of the application which is a platform thread not blocked by vThread. This thread successfully removes from the queue only if another thread attempts to insert (here, vThread):The output of this code looks as follows:

[09:41:59] Before running the task …
[09:42:00] VirtualThread[#22]/runnable
[09:42:00] Thread[#1,main,5,main]
           can’t take from the queue yet
[09:42:00] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           sleeps for 5 seconds
[09:42:05] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           inserts in the queue
[09:42:05] Thread[#1,main,5,main]took from queue: 2147483647
[09:42:05] VirtualThread[#22]/terminated
[09:42:05] After running the task …

The virtual thread started its execution (is in runnable state) but the main thread cannot remove from the queue until the virtual thread will insert an element, so it is blocked by the queue.take() operation:

[09:42:00] VirtualThread[#22]/runnable
[09:42:00] Thread[#1,main,5,main]
           can’t take from the queue yet

Meanwhile, the virtual thread sleeps for 5 seconds (at this time the main thread has nothing to do) and afterward, it inserts an element:

[09:42:00] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           sleeps for 5 seconds
[09:42:05] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           inserts in the queue

The virtual thread has inserted an element into the queue, so the main thread can remove this element from it:

[09:42:05] Thread[#1,main,5,main]took from queue: 2147483647

The virtual thread is also terminated:

[09:42:05] VirtualThread[#22]/terminated

So, virtual threads, platform threads, and synchronous code work as expected. In the bundled code, you can find an example where the virtual and platform threads switch their places. So, the platform thread attempts to insert and the virtual thread attempts to remove.