Introducing structured concurrency 2 – Concurrency – Virtual Threads, Structured Concurrency

We skip the rest of the code since you can find it in the bundled code.Of course, we can implement code-answers to each of these questions via error handling, tasks abandon and abortion, ExecutorService, and so on, but this means a lot of work for the developer. Writing failsafe solutions that carefully cover all possible scenarios across multiple tasks/subtasks while tracking their progress in a concurrent environment is not an easy job. Not to mention how hard is to understand and maintain the resulting code by another developer or even the same developer after 1-2 years or even months.It is time to add some structure to this code, so let’s introduce structured concurrency (or, Project Loom).Structured concurrency relies on several pillars meant to bring lightweight concurrency in Java. The fundamental pillar or principle of structured concurrency is highlighted next.

The fundamental principle of structured concurrency: When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block.

Based on this principle, the thread that initiates a concurrent context is the parent-thread or the owner-thread. All threads started by the parent-thread are children-threads or forks, so between them, these threads are siblings. Together, the parent-thread and the children-threads, define a parent-child hierarchy.Putting the structured concurrency principle in a diagram can be seen as follows:

Figure 10.2 – Parent-child hierarchy in structured concurrency

In the context of parent-child hierarchy, we have support for error/exception handling with short-circuiting, cancellation propagation, and monitoring/observability.Error/exception handling with short-circuiting: If a child-thread fails then all child-threads are canceled unless they are complete. For instance, if futureTester(1) fails, then futureTester(2) and futureTester(3) are automatically canceled.Cancellation propagation: If the parent-thread is interrupted until joining the child-threads is over then these forks (the child-threads/subtasks) are canceled automatically. For instance, if the thread executing buildTestingTeam() gets interrupted then its three forks are automatically canceled.Monitoring/observability: A thread dump reveals crystal-clear the entire parent-child hierarchy no matter how many levels have been spawned. Moreover, in structured concurrency, we take advantage of scheduling and memory management of threads.While these are purely concepts, writing code that respects and follows these concepts requires the proper API and the following awesome callout:

Figure 10.3 – Don’t reuse virtual threads

Cut this out and stick it somewhere to see it every day! So, in structured concurrency, don’t reuse virtual threads. I know what you are thinking: hey dude, threads are expensive and limited, so we have to reuse them. A quick hint: we are talking about virtual threads (massive throughput), not classical threads, but this topic is covered in the next problem.

Cloud computing today – Fundamentals of Cloud Architecture

Cloud computing today

This section provides an up-to-date snapshot of the current state of cloud computing and its impact on businesses and individuals. It explores the widespread adoption of cloud computing across various industries and the benefits it offers, such as scalability, cost-efficiency, and enhanced flexibility. The section also delves into the different types of cloud services available today, including IaaS, PaaS, and SaaS, highlighting their respective features and use cases.

In recent years, cloud computing has transformed the way businesses and individuals access and use technology. It has revolutionized the way we store, process, and share data, enabling greater flexibility, scalability, and cost-efficiency than ever before. With the cloud computing market projected to reach $1 trillion by 2024, it is clear that cloud computing has become an essential part of the modern technology landscape. But what exactly is cloud computing, and how does it work? In this book, we will explore the fundamental concepts of cloud computing, from its history and evolution to its various types and deployment models. We will delve into the benefits and challenges of cloud computing and examine real-world examples of how organizations are leveraging this technology to drive innovation, growth, and success. Whether you are a seasoned IT professional or simply curious about the cloud, this book will provide you with the insights and knowledge you need to navigate this exciting and rapidly changing field.

Cloud computing has become a pervasive technology that has transformed the way businesses and individuals access and use computing resources. At its core, cloud computing is about delivering computing resources over the internet, rather than owning and managing physical infrastructure. This enables greater flexibility and scalability as users can easily scale up or down their resource usage based on their needs. It also offers cost-efficiency as users only pay for what they use and can avoid upfront capital expenses. Additionally, cloud computing offers greater resilience and reliability, as cloud providers typically offer redundancy and failover capabilities to ensure that services remain available even in the event of hardware failure or other issues.

Cloud computing is a paradigm that enables the provisioning of computing resources, encompassing servers, storage, applications, and services through the internet. Instead of possessing and overseeing physical infrastructure, individuals and businesses have the option to lease these resources from cloud providers, paying only for what they consume. This approach presents numerous benefits compared to conventional on-site infrastructure, including enhanced adaptability, scalability, cost-effectiveness, and dependability.

There are several different types of cloud computing services, each offering varying levels of abstraction and control. At the lowest level of abstraction is IaaS, which provides users with access to virtualized computing resources, such as VMs, storage, and networking, that they can use to build and deploy their applications. At a higher level of abstraction is PaaS, which provides a platform on top of which users can build and deploy applications, without having to worry about the underlying infrastructure. Finally, at the highest level of abstraction is SaaS, which provides complete applications that are accessed over the internet, without the need for any installation or maintenance on the user’s part.

The benefits of cloud architecture 2 – Fundamentals of Cloud Architecture

Cloud services provide a range of collaboration tools that enable teams to work together more efficiently and productively. Some of the key collaboration features provided by cloud services are as follows:

  • Real-time collaboration: Cloud services provide real-time collaboration features such as co-authoring, commenting, and chat, allowing teams to work on the same document or project simultaneously and communicate with each other in real time
  • Shared storage: Cloud services provide shared storage, making it easier for teams to access and share files and documents, regardless of their location or device
  • Version control: Cloud services offer version control features that allow teams to track changes made to documents and restore previous versions if necessary
  • Integration with other tools: Cloud services integrate with a wide range of other collaboration tools such as project management tools, instant messaging, and video conferencing, providing a seamless collaboration experience
  • Access control: Cloud services provide access control features that enable teams to control who has access to their files and documents, ensuring that sensitive data is protected
  • Mobile access: Cloud services are accessible from anywhere, on any device, making it easy for teams to collaborate even when they are not in the same location

Cloud-based collaboration tools provided by cloud architecture can help organizations improve their productivity, streamline their workflows, and foster better collaboration among their teams. In today’s fast-paced business environment, the increasing prevalence of remote work and distributed teams has elevated the significance of cloud-based collaboration. By embracing cloud services, organizations can effectively adapt to these changes and gain a competitive edge.

Integration with other tools in cloud architecture allows cloud services to seamlessly integrate with other collaboration and productivity tools used by an organization. This integration helps create a more efficient and streamlined workflow by allowing users to access all their tools and data from a single location.

Some examples of tools that can be integrated with cloud services include project management software, communication and collaboration tools, CRM systems, and email clients. Here are some benefits of integrating cloud services with other tools:

  • Improved productivity: Integration with other tools enables users to access all their data and tools in one place, reducing the need to switch between different applications and improving productivity
  • Better collaboration: Integration with collaboration tools such as instant messaging and video conferencing can improve communication and collaboration among team members.
  • Automation: Integration with other tools can enable automation of repetitive tasks, such as data entry and reporting, saving time and reducing the risk of errors
  • Data consistency: Integration with other tools can help ensure data consistency across different systems, reducing the risk of errors and improving data quality
  • Real-time updates: Integration with other tools can enable real-time updates, ensuring that all team members have access to the latest data and information

Cloud computing systems are designed to seamlessly connect and collaborate with a wide range of existing tools and technologies. This integration enables organizations to leverage their existing infrastructure, applications, and data seamlessly within the cloud environment. By integrating with other tools, cloud architecture allows for smooth data transfer, streamlined workflows, and improved interoperability between different systems. This integration capability enhances productivity, efficiency, and the overall effectiveness of cloud-based solutions by providing a unified and cohesive ecosystem for organizations to leverage their existing tools and resources alongside cloud services. Integration with other tools is an important aspect of cloud architecture because it helps organizations create a more efficient and streamlined workflow, improving productivity, collaboration, and data quality. By integrating cloud services with other tools, organizations can create a more cohesive and effective technology ecosystem that supports their business objectives.

The following section provides a concise overview of the essential guidelines for designing and implementing effective cloud architectures. It emphasizes key practices such as scalability, high availability, performance optimization, security implementation, cost optimization, automation, and monitoring.

Introducing virtual threads 2 – Concurrency – Virtual Threads, Structured Concurrency

What are virtual threads?

Virtual threads have been introduced in JDK 19 as a preview (JEP 425) and become a final feature in JDK 21 (JEP 444). Virtual threads run on top of platform threads in a one-to-many relationship, while the platform threads run on top of OS threads in a one-to-one relationship as in the following figure:

Figure 10.6 – Virtual threads architecture

If we resume this figure in a few words then we can say that JDK maps a large number of virtual threads to a small number of OS threads.Before creating a virtual thread let’s release two important notes that will help us to quickly understand the fundamentals of virtual threads. First, let’s have a quick note about the virtual thread’s memory footprint.

Virtual threads are not wrappers of OS threads. They are lightweight Java entities (they have their own stack memory with a small footprint – only a few hundred bytes) that are cheap to create, block, and destroy (creating a virtual thread is around 1000 times cheaper than creating a classical Java thread). They can be really many of them at the same time (millions) so they sustain a massive throughput. Virtual threads should not be reused (they are disposable) or pooled.

So, when we talk about virtual threads that are more things that we should unlearn than the things that we should learn. But, where are virtual threads stored and who’s responsible to schedule them accordingly?

Virtual threads are stored in the JVM heap (so, they take advantage of Garbage Collector) instead of the OS stack. Moreover, virtual threads are scheduled by the JVM via a work-stealing ForkJoinPool scheduler. Practically, JVM schedules and orchestrates virtual threads to run on platform threads in such a way that a platform thread executes only one virtual thread at a time.

Next, let’s create a virtual thread.

Creating a virtual thread

From the API perspective, a virtual thread is another flavor of java.lang.Thread. If we dig a little bit via getClass(), we see that a virtual thread class is java.lang.VirtualThread which is a final non-public class that extends the BaseVirtualThread class which is a sealed abstract class that extends java.lang.Thread:

final class VirtualThread extends BaseVirtualThread {…}
sealed abstract class BaseVirtualThread extends Thread
  permits VirtualThread, ThreadBuilders.BoundVirtualThread {…}

Let’s consider that we have the following task (Runnable):

Runnable task = () -> logger.info(
  Thread.currentThread().toString());

Creating and starting a virtual thread

We can create and start a virtual thread for our task via the startVirtualThread(Runnable task) method as follows:

Thread vThread = Thread.startVirtualThread(task);
// next you can set its name
vThread.setName(“my_vThread”);

The returned vThread is scheduled for execution by the JVM itself. But, we can also create and start a virtual thread via Thread.ofVirtual() which returns OfVirtual (sealed interface introduced in JDK 19) as follows:

Thread vThread = Thread.ofVirtual().start(task);
// a named virtual thread
Thread.ofVirtual().name(“my_vThread”).start(task);

Now, vThread will solve our task.Moreover, we have the Thread.Builder interface (and Thread.Builder.OfVirtual subinterface) that can be used to create a virtual thread as follows:

Thread.Builder builder
  = Thread.ofVirtual().name(“my_vThread”);
Thread vThread = builder.start(task);

Here is another example of creating two virtual threads via Thread.Builder:

Thread.Builder builder
  = Thread.ofVirtual().name(“vThread-“, 1);
// name “vThread-1”
Thread vThread1 = builder.start(task);
vThread1.join();
logger.info(() -> vThread1.getName() + ” terminated”);
// name “vThread-2”
Thread vThread2 = builder.start(task);
vThread2.join();
logger.info(() -> vThread2.getName() + ” terminated”);

Check out these examples in the bundled code.

Explaining how virtual threads work – Concurrency – Virtual Threads, Structured Concurrency

213. Explaining how virtual threads work

Now that we know how to create and start a virtual thread, let’s see how it actually works.Let’s start with a meaningful diagram:

Figure 10.7 – How virtual threads works

As you can see, figure 10.7 is similar to 10.6 only that we have added a few more elements.First of all, notice that the platform threads run under a ForkJoinPool umbrella. This is a First-In-First-Out (FIFO) dedicated fork/join pool dedicated to scheduling and orchestrating the relationships between virtual threads and platform threads (a detailed coverage of Java fork/join framework is available in Java Coding Problems, First Edition, Chapter 11).

This dedicated ForkJoinPool is controlled by the JVM and it acts as the virtual thread scheduler based on a FIFO queue. Its initial capacity (number of threads) is equal to the number of available cores and it can be increased up to 256. The default virtual thread scheduler is implemented in the java.lang.VirtualThread class:

private static ForkJoinPool createDefaultScheduler() {…}

Do not confuse this ForkJoinPool with the one used for parallel streams (Common Fork Join Pool – ForkJoinPool.commonPool()).

Between the virtual threads and the platform threads, there is a one-to-many association. Nevertheless, the JVM schedules virtual threads to run on platform threads in such a way that only one virtual thread run on a platform thread at a time. When the JVM assigns a virtual thread to a platform thread, the so-called stack chunk object of the virtual thread is copied from the heap memory on the platform thread. If the code running on a virtual thread encounters a blocking (I/O) operation that should be handled by the JVM then the virtual thread is released by copying its stack chunk object back into the heap (this operation of copying the stack chunk between the heap memory and platform thread is the cost of blocking a virtual thread – this is much cheaper than blocking a platform thread). Meanwhile, the platform thread can run other virtual threads. When the blocking (I/O) of the released virtual thread is done, JVM rescheduled the virtual thread for execution on a platform thread. This can be the same platform thread or another one.

The operation of assigning a virtual thread to a platform thread is called mounting. The operation of unassigning a virtual thread from the platform thread is called unmounting. The platform thread running the assigned virtual thread is called a carrier thread.

Let’s have an example that reveals how the virtual threads are mounted:

private static final int NUMBER_OF_TASKS
  = Runtime.getRuntime().availableProcessors();
Runnable taskr = () ->
  logger.info(Thread.currentThread().toString());      
try (ExecutorService executor
     = Executors.newVirtualThreadPerTaskExecutor()) {
  for (int i = 0; i < NUMBER_OF_TASKS + 1; i++) {
    executor.submit(taskr);
  }
}

In this snippet of code, we create a number of virtual threads equal to the number of available cores + 1. On my machine, I have 8 cores (so, 8 carriers) and each of them carries a virtual thread. Since we have + 1, a carrier will work twice. The output reveals this scenario (check out the workers, here worker-8 run virtual threads #30 and #31):

VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-8
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-6
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-8

But, we can configure the ForkJoinPool via three system properties as follows:

jdk.virtualThreadScheduler.parallelism – number of CPU cores

jdk.virtualThreadScheduler.maxPoolSize – maximum pool size (256)

jdk.virtualThreadScheduler.minRunnable – minimum number of running threads (half the pool size)

In a subsequent problem, we will use these properties to better shape virtual thread context switching (mounting/unmounting) details.

Exemplifying thread context switching – Concurrency – Virtual Threads, Structured Concurrency

215. Exemplifying thread context switching

Remember that a virtual thread is mounted on a platform thread and it is executed by that platform thread until a blocking operation occurs. At that point, the virtual thread is unmounted from the platform thread and it will be rescheduled for execution by the JVM later on after the blocking operation is done. This means that, during its lifetime, a virtual thread can be mounted multiple times on different or the same platform thread.In this problem, let’s write several snippets of code meant to capture and exemplify this behavior.

Example 1

In the first example, let’s consider the following thread factory that we can use to easily switch between the platform and virtual threads:

static class SimpleThreadFactory implements ThreadFactory {
  @Override
  public Thread newThread(Runnable r) {
  return new Thread(r);                      // classic thread
  // return Thread.ofVirtual().unstarted(r); // virtual thread
  }
}

Next, we try to execute the following task via 10 platform threads:

public static void doSomething(int index) {
  logger.info(() -> index + ” “
    + Thread.currentThread().toString());
  try { Thread.sleep(Duration.ofSeconds(3)); }
    catch (InterruptedException ex) {}
  logger.info(() -> index + ” “
    + Thread.currentThread().toString());
}

Between the two logging lines, we have a blocking operation (sleep()). Next, we rely on newThreadPerTaskExecutor() to submit 10 tasks that should log their details, sleep for 3 seconds, and log again:

try (ExecutorService executor = 
    Executors.newThreadPerTaskExecutor(
      new SimpleThreadFactory())) {
  for (int i = 0; i < MAX_THREADS; i++) {
    int index = i;
    executor.submit(() -> doSomething(index));
  }
}

Running this code with platform threads reveals the following side-to-side output:

Figure 10.8 – Using platform threads

By carefully inspecting this figure, we notice that there is a fixed association between these numbers. For instance, the task with id 5 is executed by Thread-5, task 3 by Thread-3, and so on. After sleeping (blocking operation), these numbers are unchanged. This means that while the tasks are sleeping the threads are just hanging and waiting there. They have no work to do.Let’s switch from platform threads to virtual threads and let’s run again:

@Override
public Thread newThread(Runnable r) {
  // return new Thread(r);                // classic thread
  return Thread.ofVirtual().unstarted(r); // virtual thread
}

Now, the output is resumed in the following figure:

Figure 10.9 – Using virtual threads

This time, we see that things are more dynamic. For instance, the task with id 5 is started by a virtual thread executed by worker-6 but is finished by worker-4. The task with id 3 is started by a virtual thread executed by worker-4 but is finished by worker-6. This means that, while a task is sleeping (blocking operation), the corresponding virtual thread is unmounted and its worker can serve other virtual threads. When the sleeping is over, the JVM schedules the virtual thread for execution and is mounted on another (it could be the same as well) worker. This is also referred to as thread context switching.

Introducing the ExecutorService invoke all/any for virtual threads – part 1 – Concurrency – Virtual Threads, Structured Concurrency

216. Introducing the ExecutorService invoke all/any for virtual threads – part 1

We have introduced the ExecutorService’s invokeAll()/invokeAny() in Java Coding Problems, First Edition, Chapter 10, Problem 207.

Working with invokeAll()

In a nutshell, invokeAll() executes a collection of tasks (Callable) and returns a List<Future> that holds the results/status of each task. The tasks can finish naturally or forced by a given timeout. Each task can finish successfully or exceptionally. Upon return, all the tasks that have not been completed yet are automatically canceled. We can check out the status of each task via Future#isDone() and Future#isCancelled().

<T> List<Future<T>> invokeAll(Collection<? extends
  Callable<T>> tasks) throws InterruptedException
<T> List<Future<T>> invokeAll(
  Collection<? extends Callable<T>> tasks, long timeout,
    TimeUnit unit) throws InterruptedException

Using invokeAll() with virtual threads via newVirtualThreadPerTaskExecutor() is straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = executor.invokeAll(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  futures.forEach(f -> logger.info(() ->
    “State: ” + f.state()));
}

Have you spotted the f.state() call? This API was introduced in JDK 19 and it computes the state of a future based on the well-known get(), isDone(), and isCancelled(). While we will detail this in a subsequent problem, at this moment the output will be as follows:

[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS

The three tasks have successfully completed.

Working with invokeAny()

In a nutshell, invokeAny() executes a collection of tasks (Callable) and strives to return a result corresponding to a task that has successfully terminated (before the given timeout, if any). All the tasks that have not been completed are automatically canceled.

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit) throws InterruptedException,
    ExecutionException, TimeoutException

Using invokeAny() with virtual threads via newVirtualThreadPerTaskExecutor() is also straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances while we are interested in a single result:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  String result = executor.invokeAny(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  logger.info(result);
}

A possible output can be:

[10:29:33] pass02

This output corresponds to the second Callable.In the next problem, we will come up with a more realistic example.

Combining newVirtualThreadPerTaskExecutor() and streams – Concurrency – Virtual Threads, Structured Concurrency

219. Combining newVirtualThreadPerTaskExecutor() and streams

Streams and newVirtualThreadPerTaskExecutor() is a handy combination. Here is an example that relies on IntStream to submit 10 simple tasks and collect the returned List of Future:

try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = IntStream.range(0, 10)
    .mapToObj(i -> executor.submit(() -> {
       return Thread.currentThread().toString()
         + “(” + i + “)”;
  })).collect(toList());
  // here we have the following snippet of code
}

Next, we wait for each Future to complete by calling the get() method:

  futures.forEach(f -> {
    try {
      logger.info(f.get());
    } catch (InterruptedException | ExecutionException ex) {
      // handle exception
    }
  });

Moreover, using stream pipelines is quite useful in combination with invokeAll(). For instance, the following stream pipeline returns a List of results (it filters all Future that hasn’t complete successfully):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .<String>mapMulti((f, c) -> {
    c.accept((String) f.resultNow());
  }).collect(Collectors.toList());

Alternatively, we can write the following solution (without mapMulti()):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(f -> f.resultNow().toString())
  .toList();

Of course, if List<Object> is all you need then you can go straightforward via Future::resultNow as follows:

List<Object> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(Future::resultNow)
  .toList();

On the other hand, you may need to collect all the Future that has been completed exceptionally. This can be achieved via exceptionNow() as follows (we intentionally sneaked in the given List<Callable> a Callable that will generate an StringIndexOutOfBoundsException, () -> “pass02”.substring(50)):

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”,
          () -> “pass02”.substring(50), () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .<Throwable>mapMulti((f, c) -> {
    c.accept((Throwable) f.exceptionNow());
  }).collect(Collectors.toList());

If you don’t prefer mapMulti() then rely on the classical approach:

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”.substring(50),
          () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .map(Future::exceptionNow)
  .toList();

You can find all these examples in the bundled code.

Introducing ShutdownOnSuccess – Concurrency – Virtual Threads, Structured Concurrency

221. Introducing ShutdownOnSuccess

In the previous problem, we introduced StructuredTaskScope and use it to solve a task via a single virtual thread (a single Subtask). Basically, we fetched the tester with id 1 from our server (we had to wait until this one was available). Next, let’s assume that we still need a single tester, but not mandatory the one with id 1. This time, it could be any of ids 1, 2, or 3. We simply take the first one that is available from these three, and we cancel the other two requests.Especially for such scenarios, we have an extension of StructuredTaskScope called StructuredTaskScope.ShutdownOnSuccess. This scope is capable to return the result of the first task that completes successfully and interrupts the rest of the threads. It follows the “invoke any” model and it can be used as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnSuccess scope
      = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
          
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(3));
                                               
    scope.join();
          
    logger.info(() -> “Subtask-1 state: ” + future1.state());
    logger.info(() -> “Subtask-2 state: ” + future2.state());
    logger.info(() -> “Subtask-3 state: ” + future3.state());
    String result = (String) scope.result();
    logger.info(result);
          
    return new TestingTeam(result);
  }
}

Here, we fork three subtasks (threads) that will compete with each other to complete. The first subtask (thread) that completes successfully wins and returns. The result() method returns this result (if none of the subtasks (threads) complete successfully then it throws ExecutionException).If we check the state of these three Subtask we can see that one succeeds while the other two are unavailable:

[09:01:50] Subtask-1 state: UNAVAILABLE
[09:01:50] Subtask-2 state: SUCCESS
[09:01:50] Subtask-3 state: UNAVAILABLE

Of course, you don’t need the code that checks/print the state of each Subtask. It was added here just to highlight how ShutdownOnSuccess works. You don’t even need the explicit Subtask objects since we don’t call get() or something else from this API. Basically, we can resume the code to the following:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
  try (ShutdownOnSuccess scope
      = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    scope.fork(() -> fetchTester(1));
    scope.fork(() -> fetchTester(2));
    scope.fork(() -> fetchTester(3));
    scope.join();
     
    return new TestingTeam((String) scope.result());
  }
}

Done! You just create the scope, fork your subtasks, call join() , and collect the result. So, the scope is really business-focused.A task that completes exceptionally under the ShutdownOnSuccess umbrella will never be chosen to produce a result. However, if all tasks complete exceptionally then we will get an ExecutionException that wraps the exception (the cause) of the first completed task.

Combining StructuredTaskScope and streams 3 – Concurrency – Virtual Threads, Structured Concurrency

Running 10000 tasks via fixed thread pool executor

Depending on your machine, the previous test may finish successfully or it may result in an OutOfMemoryError. We can avoid this unpleasant scenario by using a fixed thread pool. For instance, let’s limit the number of platform threads to 200 via the following snippet of code:

long start = System.currentTimeMillis();
       
try (ExecutorService executorFixed
    = Executors.newFixedThreadPool(200)) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorFixed.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 50190 ms (50 seconds) to run these 10000 tasks using at peak 216 platform threads. The following screenshot from JMX reveals this information:

Figure 10.15 – Running 10000 tasks via fixed thread pool executor

Obviously, a smaller number of platform threads is reflected in performance. If we put 216 workers to do the job of 7729 workers it will take longer for sure. Next, let’s see how virtual threads will handle this challenge.

Running 10000 tasks via virtual thread per task executor

This time, let’s see how the newVirtualThreadPerTaskExecutor() can handle these 10000 tasks. The code is straightforward:

long start = System.currentTimeMillis();
      
try (ExecutorService executorVirtual
      = Executors.newVirtualThreadPerTaskExecutor()) {
  IntStream.range(0, 10_000).forEach(i -> {
    executorVirtual.submit(() -> {
      Thread.sleep(Duration.ofSeconds(1));
      logger.info(() -> “Task: ” + i);
      return i;
    });
  });
}
      
logger.info(() -> “Time (ms): “
  + (System.currentTimeMillis() – start));

On my machine, it took 3519 ms (3.5 seconds) to run these 10000 tasks using at peak 25 platform threads. The following screenshot from JMX reveals this information:

Figure 10.16 – Running 10000 tasks via virtual thread per task executor

Wooow! How cool is this?! The resulting time is far away the best in comparison with the previous tests and it uses fewer resources (only 25 platform threads). So, virtual threads really rock!I also strongly recommend you check out the following benchmark: https://github.com/colincachia/loom-benchmark/tree/main.Starting with JDK 21, the JMX’s HotSpotDiagnosticMXBean was enriched with the dumpThreads(String outputFile, ThreadDumpFormat format) method. This method outputs a thread dump to the given file (outputFile) in the given format (format). The thread dump will contain all platform threads but it may contain some or all virtual threads.In the following code, we attempt to obtain a thread dump for all subtasks (threads) of a StructuredTaskScope:

try (ShutdownOnSuccess scope
  = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
  Stream.of(1, 2, 3)
        .<Callable<String>>map(id -> () -> fetchTester(id))
        .forEach(scope::fork);
  HotSpotDiagnosticMXBean mBean = ManagementFactory
    .getPlatformMXBean(HotSpotDiagnosticMXBean.class);
  mBean.dumpThreads(Path.of(“dumpThreads.json”)
    .toAbsolutePath().toString(),                    
  HotSpotDiagnosticMXBean.ThreadDumpFormat.JSON);
  scope.join();
  String result = (String) scope.result();
  logger.info(result);
}

The output file is named threadDump.json and you can find it in the root folder of the application. The part of the output that we are interested in is partially listed here:


{
  “container”: “java.util.concurrent
             .StructuredTaskScope$ShutdownOnSuccess@6d311334″,
  “parent”: “<root>”,
  “owner”: “1”,
  “threads”: [
    {
    “tid”: “22”
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
       .run(VirtualThread.java:311)”
      ]
    },
    {
    “tid”: “24”,
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
        .run(VirtualThread.java:311)”
      ]
    },
    {
    “tid”: “25”,
    “name”: “”,
    “stack”: [
      …
      “java.base\/java.lang.VirtualThread
        .run(VirtualThread.java:311)”
      ]
    }
  ],
  “threadCount”: “3”
}

As you can see, we have three virtual threads (#22, #24, and #25) that run subtasks of our scope. In the bundled code, you can find the complete output.

Summary

This chapter covered 16 introductory problems about virtual threads and structured concurrency. You can see this chapter as the preparation for the next one, which will cover more detailed aspects of these two topics.