Introducing structured concurrency 2 – Concurrency – Virtual Threads, Structured Concurrency

We skip the rest of the code since you can find it in the bundled code.Of course, we can implement code-answers to each of these questions via error handling, tasks abandon and abortion, ExecutorService, and so on, but this means a lot of work for the developer. Writing failsafe solutions that carefully cover all possible scenarios across multiple tasks/subtasks while tracking their progress in a concurrent environment is not an easy job. Not to mention how hard is to understand and maintain the resulting code by another developer or even the same developer after 1-2 years or even months.It is time to add some structure to this code, so let’s introduce structured concurrency (or, Project Loom).Structured concurrency relies on several pillars meant to bring lightweight concurrency in Java. The fundamental pillar or principle of structured concurrency is highlighted next.

The fundamental principle of structured concurrency: When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block.

Based on this principle, the thread that initiates a concurrent context is the parent-thread or the owner-thread. All threads started by the parent-thread are children-threads or forks, so between them, these threads are siblings. Together, the parent-thread and the children-threads, define a parent-child hierarchy.Putting the structured concurrency principle in a diagram can be seen as follows:

Figure 10.2 – Parent-child hierarchy in structured concurrency

In the context of parent-child hierarchy, we have support for error/exception handling with short-circuiting, cancellation propagation, and monitoring/observability.Error/exception handling with short-circuiting: If a child-thread fails then all child-threads are canceled unless they are complete. For instance, if futureTester(1) fails, then futureTester(2) and futureTester(3) are automatically canceled.Cancellation propagation: If the parent-thread is interrupted until joining the child-threads is over then these forks (the child-threads/subtasks) are canceled automatically. For instance, if the thread executing buildTestingTeam() gets interrupted then its three forks are automatically canceled.Monitoring/observability: A thread dump reveals crystal-clear the entire parent-child hierarchy no matter how many levels have been spawned. Moreover, in structured concurrency, we take advantage of scheduling and memory management of threads.While these are purely concepts, writing code that respects and follows these concepts requires the proper API and the following awesome callout:

Figure 10.3 – Don’t reuse virtual threads

Cut this out and stick it somewhere to see it every day! So, in structured concurrency, don’t reuse virtual threads. I know what you are thinking: hey dude, threads are expensive and limited, so we have to reuse them. A quick hint: we are talking about virtual threads (massive throughput), not classical threads, but this topic is covered in the next problem.

Understanding cloud architecture – Fundamentals of Cloud Architecture

Understanding cloud architecture

To comprehend the inner workings of cloud computing, it is crucial to understand its underlying architecture. This section provides a comprehensive overview of cloud architecture, elucidating the key components and their interconnections. It explains the concepts of virtualization, distributed computing, and load balancing, which form the building blocks of cloud infrastructure.

Cloud architecture is a term that’s used to describe the design and organization of a cloud computing system. A cloud computing system typically consists of various components, including computing resources, storage, network infrastructure, security measures, and software applications. Cloud architecture refers to the way these components are organized and integrated to provide a seamless and efficient cloud computing environment. The following figure depicts a basic cloud architecture design. It covers the end user connection, backend/database, memory cache, middleware, and frontend in Google Cloud:

Figure 1.3 – A basic cloud architecture

Cloud architecture involves making critical decisions regarding the cloud deployment model, cloud service model, and cloud providers, among others. These decisions will affect the performance, scalability, security, and cost-effectiveness of the cloud computing system. A well-designed cloud architecture should enable an organization to leverage the benefits of cloud computing, such as cost savings, scalability, and flexibility, while minimizing the potential risks and drawbacks.

Cloud architecture is an essential aspect of any cloud computing project, and it requires a deep understanding of cloud computing technologies, business requirements, and architecture principles. A successful cloud architect must be able to design and implement cloud solutions that meet the specific needs of their organization, whether it is a small business, a large enterprise, or a government agency.

Cloud architecture can also be described as a set of principles, guidelines, and best practices that are used to design and manage cloud computing systems. It involves planning, designing, implementing, and managing cloud-based solutions that meet specific business needs and requirements.

The following figure showcases a visual representation of cloud computing, highlighting the different deployment models and service models:

Figure 1.4 – A visual representation of cloud computing

At a high level, cloud architecture involves several key components, including the following:

  • Cloud service models: Cloud computing provides three distinct service models: IaaS, PaaS, and SaaS. Each model offers users different levels of control, flexibility, and customization. For instance, IaaS examples include Amazon Web Services (AWS) EC2 and Microsoft Azure Virtual Machines, which grant users access to virtual servers and infrastructure resources. PaaS examples encompass Google Cloud Platform’s App Engine and Heroku, which provide managed platforms for application development and deployment. Lastly, SaaS examples encompass Salesforce, a cloud-based CRM platform, and Google Workspace, a suite of productivity and collaboration tools. These examples demonstrate how IaaS empowers users to provision and oversee virtual infrastructure, PaaS abstracts the underlying platform for application development, and SaaS grants access to fully functional software over the internet. By utilizing these distinct service models, organizations can leverage cloud-based resources and software without the need to manage infrastructure or install software locally.
  • Cloud deployment models: Cloud computing deployment models encompass public cloud, private cloud, hybrid cloud, and multi-cloud, each presenting unique advantages and challenges. Examples of these deployment models include well-known providers such as AWS, Microsoft Azure, and Google Cloud Platform. In a public cloud, computing resources are shared among multiple organizations and accessible over the internet. Private cloud, on the other hand, involves dedicated cloud infrastructure that can be deployed on-premises or hosted by a single organization, offering greater control and privacy. Hybrid cloud combines both public and private cloud environments, enabling organizations to leverage scalability and flexibility. Multi-cloud refers to utilizing multiple cloud service providers concurrently, allowing for workload distribution, redundancy, cost optimization, and access to specialized services. These deployment models grant varying levels of control, flexibility, and scalability, enabling organizations to tailor their cloud strategies to their specific needs and leverage the full benefits of cloud computing.
The benefits of cloud architecture 2 – Fundamentals of Cloud Architecture

Cloud services provide a range of collaboration tools that enable teams to work together more efficiently and productively. Some of the key collaboration features provided by cloud services are as follows:

  • Real-time collaboration: Cloud services provide real-time collaboration features such as co-authoring, commenting, and chat, allowing teams to work on the same document or project simultaneously and communicate with each other in real time
  • Shared storage: Cloud services provide shared storage, making it easier for teams to access and share files and documents, regardless of their location or device
  • Version control: Cloud services offer version control features that allow teams to track changes made to documents and restore previous versions if necessary
  • Integration with other tools: Cloud services integrate with a wide range of other collaboration tools such as project management tools, instant messaging, and video conferencing, providing a seamless collaboration experience
  • Access control: Cloud services provide access control features that enable teams to control who has access to their files and documents, ensuring that sensitive data is protected
  • Mobile access: Cloud services are accessible from anywhere, on any device, making it easy for teams to collaborate even when they are not in the same location

Cloud-based collaboration tools provided by cloud architecture can help organizations improve their productivity, streamline their workflows, and foster better collaboration among their teams. In today’s fast-paced business environment, the increasing prevalence of remote work and distributed teams has elevated the significance of cloud-based collaboration. By embracing cloud services, organizations can effectively adapt to these changes and gain a competitive edge.

Integration with other tools in cloud architecture allows cloud services to seamlessly integrate with other collaboration and productivity tools used by an organization. This integration helps create a more efficient and streamlined workflow by allowing users to access all their tools and data from a single location.

Some examples of tools that can be integrated with cloud services include project management software, communication and collaboration tools, CRM systems, and email clients. Here are some benefits of integrating cloud services with other tools:

  • Improved productivity: Integration with other tools enables users to access all their data and tools in one place, reducing the need to switch between different applications and improving productivity
  • Better collaboration: Integration with collaboration tools such as instant messaging and video conferencing can improve communication and collaboration among team members.
  • Automation: Integration with other tools can enable automation of repetitive tasks, such as data entry and reporting, saving time and reducing the risk of errors
  • Data consistency: Integration with other tools can help ensure data consistency across different systems, reducing the risk of errors and improving data quality
  • Real-time updates: Integration with other tools can enable real-time updates, ensuring that all team members have access to the latest data and information

Cloud computing systems are designed to seamlessly connect and collaborate with a wide range of existing tools and technologies. This integration enables organizations to leverage their existing infrastructure, applications, and data seamlessly within the cloud environment. By integrating with other tools, cloud architecture allows for smooth data transfer, streamlined workflows, and improved interoperability between different systems. This integration capability enhances productivity, efficiency, and the overall effectiveness of cloud-based solutions by providing a unified and cohesive ecosystem for organizations to leverage their existing tools and resources alongside cloud services. Integration with other tools is an important aspect of cloud architecture because it helps organizations create a more efficient and streamlined workflow, improving productivity, collaboration, and data quality. By integrating cloud services with other tools, organizations can create a more cohesive and effective technology ecosystem that supports their business objectives.

The following section provides a concise overview of the essential guidelines for designing and implementing effective cloud architectures. It emphasizes key practices such as scalability, high availability, performance optimization, security implementation, cost optimization, automation, and monitoring.

Introducing virtual threads 4 – Concurrency – Virtual Threads, Structured Concurrency

Printing a thread (toString())

If we print a virtual thread (calling the toString() method) then the output will be something as follows:

VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#26,vt-0]/runnable@ForkJoinPool-1-worker-1

In a nutshell, this output can be interpreted as follows: VirtualThread[#22] indicates that this is a virtual thread that contains the thread identifier (#22) with no name (in the case of VirtualThread[#26,vt-0], the id is #26 and the name is vt-0). Then, we have the runnable text which indicates the state of the virtual thread (runnable means that the virtual thread is running). Next, we have the carrier thread of the virtual thread which is a platform thread: ForkJoinPool-1-worker-1 contains the platform thread name (worker-1) of the default ForkJoinPool (ForkJoinPool-1).

How many virtual threads we can start

Finally, let’s run a code that allows us to see how many virtual threads we can create and start:

AtomicLong counterOSThreads = new AtomicLong();
while (true) {
  Thread.startVirtualThread(() -> {
    long currentOSThreadNr
      = counterOSThreads.incrementAndGet();
    System.out.println(“Virtual thread: “
      + currentOSThreadNr);              
    LockSupport.park();              
  });
}

On my machine, this code started to slow down after around 14,000,000 virtual threads. It continues to run slowly while memory gets available (Garbage Collector in action) but didn’t crush. So, a massive throughput!

Backward compatibility

Virtual threads are compatible with:

Synchronized blocks

Thread-local variables

Thread and currentThread()

Thread interruption (InterruptedException)

Basically, virtual threads work out of the box once you update to at least JDK 19. They heavily sustain a clean, readable, and more structured code being the bricks behind the structured concurrency paradigm.

Avoiding fake conclusions (potentially myths)

Virtual threads are faster than platform threads (FAKE!): Virtual threads can be quite many but they are not faster than classical (platform) threads. They don’t boost in-memory computational capabilities (for that we have the parallel streams). Don’t conclude that virtual threads do some magic that makes them faster or more optimal for solving a task. So, virtual threads can seriously improve throughput (since millions of them can wait for jobs) but they cannot improve latency. However, virtual threads can be launched much faster than platform threads (a virtual thread has a creating time in the order of the µs and needs a space in the order of kB).Virtual threads should be pooled (FAKE!): Virutal threads should not be part of any thread pool and should never be pooled.Virtual threads are expensive (FAKE!): Virtual threads are not for free (nothing is for free) but they are cheaper to create, block, and destroy than platform threads. A virtual thread is 1000x cheaper than a platform thread.Virtual threads can release a task (FAKE!): This is not true! A virtual thread takes a task and should return a result or gets interrupted. It cannot release the task.Blocking a virtual thread blocks its carrier thread (FAKE!): Blocking a virtual thread doesn’t block its carrier thread. The carrier thread can server other virtual threads.

Using the ExecutorService for virtual threads – Concurrency – Virtual Threads, Structured Concurrency

212. Using the ExecutorService for virtual threads

Virtual threads allow us to write more expressive and straightforward concurrent code. Thanks to the massive throughput obtained via virtual threads we can easily adopt the task-per-thread model (for an HTTP server, this means a request per thread, for a database, this means a transaction per thread, and so on). In other words, we can assign a new virtual thread for each concurrent task. Trying to use the task-per-thread model with platform threads will result in a throughput limited by the number of hardware’s cores – this is explained by Little’s law (https://en.wikipedia.org/wiki/Little%27s_law), L = λW, or throughput equals average concurrency divided by latency.Whenever possible it is recommended to avoid interacting with threads directly. JDK sustains this via ExecutorService/Executor API. More precisely, we are used to submitting a task (Runnable/Callable) to an ExecutorService/Executor and working with the returned Future. This pattern is valid for virtual threads as well.So, we don’t have to write ourselves all the plumbing code for adopting the task-per-thread for virtual threads because, starting with JDK 19, this model is available via the Executors class. More precisely, via the newVirtualThreadPerTaskExecutor() method which creates an ExecutorService capable to create an unbounded number of virtual threads that follows the task-per-thread model. This ExecutorService exposes methods that allow us to give the tasks such as the submit() (as you’ll see next) and invokeAll/Any() (as you’ll see later) methods, and return a Future containing an exception or a result.

Starting with JDK 19, the ExecutorService extends the AutoCloseable interface. In other words, we can use ExecutorService in a try-with-resources pattern.

Consider the following simple Runnable and Callable:

Runnable taskr = () -> logger.info(
  Thread.currentThread().toString());
      
Callable<Boolean> taskc = () -> {
  logger.info(Thread.currentThread().toString());
  return true;
};

Executing the Runnable/Callable can be done as follows (here, we submit 15 tasks (NUMBER_OF_TASKS = 15)):

try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
  for (int i = 0; i < NUMBER_OF_TASKS; i++) {
    executor.submit(taskr); // executing Runnable
    executor.submit(taskc); // executing Callable
  }
}

Of course, in the case of Runnable/Callable we can capture a Future and act accordingly via the blocking get() method or whatever we want to do.

Future<?> future = executor.submit(taskr);
Future<Boolean> future = executor.submit(taskc);

A possible output looks as follows:

VirtualThread[#28]/runnable@ForkJoinPool-1-worker-6
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#36]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#37]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#35]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#34]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#33]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1

Check out the virtual threads’ ids. They range between #22-#37 without repetition. Each task is executed by its own virtual thread.The task-per-thread model is also available for classical threads via newThreadPerTaskExecutor(ThreadFactory threadFactory). Here is an example:

static class SimpleThreadFactory implements ThreadFactory {
  @Override
  public Thread newThread(Runnable r) {
    return new Thread(r);                     // classic
   // return Thread.ofVirtual().unstarted(r); // virtual
  }
}
try (ExecutorService executor =
      Executors.newThreadPerTaskExecutor(
          new SimpleThreadFactory())) {
  for (int i = 0; i < NUMBER_OF_TASKS; i++) {
    executor.submit(taskr); // executing Runnable
    executor.submit(taskc); // executing Callable
  }
}

As you can see, newThreadPerTaskExecutor() can be used for classic threads or for virtual threads. The number of created threads is unbounded. By simply modifying the thread factory we can switch between virtual/classic threads.A possible output looks as follows:

Thread[#75,Thread-15,5,main]
Thread[#77,Thread-17,5,main]
Thread[#76,Thread-16,5,main]
Thread[#83,Thread-23,5,main]
Thread[#82,Thread-22,5,main]
Thread[#80,Thread-20,5,main]
Thread[#81,Thread-21,5,main]
Thread[#79,Thread-19,5,main]
Thread[#78,Thread-18,5,main]
Thread[#89,Thread-29,5,main]
Thread[#88,Thread-28,5,main]
Thread[#87,Thread-27,5,main]
Thread[#86,Thread-26,5,main]
Thread[#85,Thread-25,5,main]
Thread[#84,Thread-24,5,main]

Check out the threads’ ids. They range between #75-#89 without repetition. Each task is executed by its own thread.

Hooking virtual threads and sync code – Concurrency – Virtual Threads, Structured Concurrency

214. Hooking virtual threads and sync code

The goal of this problem is to highlight how virtual threads interact with synchronous code. For this, we use the built-in java.util.concurrent.SynchronousQueue. This is a built-in blocking queue that allows only one thread to operate at a time. More precisely, a thread that wants to insert an element in this queue is blocked until another thread attempts to remove an element from it and vice versa. Basically, a thread cannot insert an element unless another thread attempts to remove an element.Let’s assume that a virtual thread attempts to insert in a SynchronousQueue, while a platform thread attempts to remove from this queue. In code lines, we have:

SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable task = () -> {
  logger.info(() -> Thread.currentThread().toString()
    + ” sleeps for 5 seconds”);
  try { Thread.sleep(Duration.ofSeconds(5)); }
    catch (InterruptedException ex) {}
  logger.info(() -> “Running “
    + Thread.currentThread().toString());
  queue.add(Integer.MAX_VALUE);
};
logger.info(“Before running the task …”);
Thread vThread = Thread.ofVirtual().start(task);
logger.info(vThread.toString());

So, the virtual thread (vThread) waits for 5 seconds before attempting to insert an element in the queue. However, it will not successfully insert an element until another thread attempts to remove an element from this queue:

logger.info(() -> Thread.currentThread().toString()
  + ” can’t take from the queue yet”);
int maxint = queue.take();                      
logger.info(() -> Thread.currentThread().toString()
  + “took from queue: ” + maxint);              
      
logger.info(vThread.toString());
logger.info(“After running the task …”);

Here, the Thread.currentThread() refers to the main thread of the application which is a platform thread not blocked by vThread. This thread successfully removes from the queue only if another thread attempts to insert (here, vThread):The output of this code looks as follows:

[09:41:59] Before running the task …
[09:42:00] VirtualThread[#22]/runnable
[09:42:00] Thread[#1,main,5,main]
           can’t take from the queue yet
[09:42:00] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           sleeps for 5 seconds
[09:42:05] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           inserts in the queue
[09:42:05] Thread[#1,main,5,main]took from queue: 2147483647
[09:42:05] VirtualThread[#22]/terminated
[09:42:05] After running the task …

The virtual thread started its execution (is in runnable state) but the main thread cannot remove from the queue until the virtual thread will insert an element, so it is blocked by the queue.take() operation:

[09:42:00] VirtualThread[#22]/runnable
[09:42:00] Thread[#1,main,5,main]
           can’t take from the queue yet

Meanwhile, the virtual thread sleeps for 5 seconds (at this time the main thread has nothing to do) and afterward, it inserts an element:

[09:42:00] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           sleeps for 5 seconds
[09:42:05] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           inserts in the queue

The virtual thread has inserted an element into the queue, so the main thread can remove this element from it:

[09:42:05] Thread[#1,main,5,main]took from queue: 2147483647

The virtual thread is also terminated:

[09:42:05] VirtualThread[#22]/terminated

So, virtual threads, platform threads, and synchronous code work as expected. In the bundled code, you can find an example where the virtual and platform threads switch their places. So, the platform thread attempts to insert and the virtual thread attempts to remove.

Introducing the ExecutorService invoke all/any for virtual threads – part 2 – Concurrency – Virtual Threads, Structured Concurrency

217. Introducing the ExecutorService invoke all/any for virtual threads – part 2

Earlier, in Problem 203, we wrote a piece of “unstructured” concurrency code for building a testing team of three testers served by an external server.Now, let’s try to re-write the buildTestingTeam() method via invokeAll()/Any() and newVirtualThreadPerTaskExecutor(). If we rely on invokeAll() then the application will attempt to load three testers by id as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException {
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    futures.forEach(f -> logger.info(() -> “State: “
      + f.state()));
    return new TestingTeam(futures.get(0).resultNow(),
      futures.get(1).resultNow(), futures.get(2).resultNow());                      
  }      
}

We have three testers with ids 1, 2, and 3. So, the output will be:

[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS

In the next problem, we will see how we can take decisions based on task state.If we can handle the testing phase even with a single tester then we can rely on invokeAny() as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException, ExecutionException {
      
  try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
    String result = executor.invokeAny(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    logger.info(result);
    return new TestingTeam(result);
  }      
}

This code will return a single result representing one of these three testers. If none of them is available then we will get an UserNotFoundException exception.

218. Hooking task state

Starting with JDK 19 we can rely on Future#state(). This method computes the state of a Future based on the well-known get(), isDone(), and isCancelled() and returns a Future.State enum entry as follows:

CANCELLED – The task was canceled.

FAILED – The task was completed exceptionally (with an exception).

RUNNING – The task is still running (has not been completed).

SUCCESS – The task was completed normally with a result (no exception).

In the following snippet of code, we analyze the state of loading the testing team members and act accordingly:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
  List<String> testers = new ArrayList<>();
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(Integer.MAX_VALUE),
              () -> fetchTester(2),
              () -> fetchTester(Integer.MAX_VALUE)));
    futures.forEach(f -> {
      logger.info(() -> “Analyzing ” + f + ” state …”);            
              
      switch (f.state()) {
        case RUNNING -> throw new IllegalStateException(
          “Future is still in the running state …”);
        case SUCCESS -> {
          logger.info(() -> “Result: ” + f.resultNow());
          testers.add(f.resultNow());
        }
        case FAILED ->
          logger.severe(() -> “Exception: “
            + f.exceptionNow().getMessage());
        case CANCELLED ->
          logger.info(“Cancelled ?!?”);
      }
    });                      
  }
      
  return new TestingTeam(testers.toArray(String[]::new));
}

We know that when the execution reaches the switch block the Future objects should be completely normal or exceptional. So, if the current Future state is RUNNING then this is a really weird situation (possibly a bug) and we throw an IllegalStateException. Next, if the Future state is SUCCESS (fetchTester(2)) then we have a result that can be obtained via resultNow(). This method was added in JDK 19 and it is useful when we know that we have a result. The resultNow() method returns immediately without waiting (as get()). If the state is FAILED (fetchTester(Integer.MAX_VALUE)) then we log the exception via exceptionNow(). This method was also added in JDK 19 and it returns immediately the underlying exception of a failed Future. Finally, if the Future was canceled then there is nothing to do. We just report it in the log.

Introducing the ExecutorService invoke all/any for virtual threads – part 1 – Concurrency – Virtual Threads, Structured Concurrency

216. Introducing the ExecutorService invoke all/any for virtual threads – part 1

We have introduced the ExecutorService’s invokeAll()/invokeAny() in Java Coding Problems, First Edition, Chapter 10, Problem 207.

Working with invokeAll()

In a nutshell, invokeAll() executes a collection of tasks (Callable) and returns a List<Future> that holds the results/status of each task. The tasks can finish naturally or forced by a given timeout. Each task can finish successfully or exceptionally. Upon return, all the tasks that have not been completed yet are automatically canceled. We can check out the status of each task via Future#isDone() and Future#isCancelled().

<T> List<Future<T>> invokeAll(Collection<? extends
  Callable<T>> tasks) throws InterruptedException
<T> List<Future<T>> invokeAll(
  Collection<? extends Callable<T>> tasks, long timeout,
    TimeUnit unit) throws InterruptedException

Using invokeAll() with virtual threads via newVirtualThreadPerTaskExecutor() is straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = executor.invokeAll(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  futures.forEach(f -> logger.info(() ->
    “State: ” + f.state()));
}

Have you spotted the f.state() call? This API was introduced in JDK 19 and it computes the state of a future based on the well-known get(), isDone(), and isCancelled(). While we will detail this in a subsequent problem, at this moment the output will be as follows:

[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS
[10:17:41] State: SUCCESS

The three tasks have successfully completed.

Working with invokeAny()

In a nutshell, invokeAny() executes a collection of tasks (Callable) and strives to return a result corresponding to a task that has successfully terminated (before the given timeout, if any). All the tasks that have not been completed are automatically canceled.

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit) throws InterruptedException,
    ExecutionException, TimeoutException

Using invokeAny() with virtual threads via newVirtualThreadPerTaskExecutor() is also straightforward (or with newThreadPerTaskExecutor()). For instance, here we have a simple example of executing three Callable instances while we are interested in a single result:

try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
  String result = executor.invokeAny(
    List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”));
          
  logger.info(result);
}

A possible output can be:

[10:29:33] pass02

This output corresponds to the second Callable.In the next problem, we will come up with a more realistic example.

Introducing scope object (StructuredTaskScope) 2 – Concurrency – Virtual Threads, Structured Concurrency

First, we create a StructuredTaskScope in a try-with-resources pattern. StructuredTaskScope implements AutoCloseable:

try (StructuredTaskScope scope
      = new StructuredTaskScope<String>()) {
    …
}

The scope is a wrapper for the virtual threads’ lifetime. We use the scope to fork as many virtual threads (subtasks) as needed via the fork(Callable task) method. Here, we fork only one virtual thread and get back a Subtask (forking is a non-blocking operation):

Subtask<String> subtask = scope.fork(() -> fetchTester(1));

Next, we have to call the join() method (or joinUntil(Instant deadline)). This method waits for all threads (all Subtask instances) forked from this scope (or, all threads that have been submitted to this scope) to complete, so it is a blocking operation. A scope should block only for waiting its subtasks to complete, and this is happening via join() or joinUntil().

scope.join();          

When the execution passes this line, we know that all threads (all forked Subtask) forked from this scope are complete with a result or an exception (each subtask run indenpendently, therefore each of them can complete with a result or an exception). Here, we call the non-blocking get() mehod to get the result but pay attention that calling get() for a task that did not complete will rise an exception as IllegalStateException(“Owner did not join after forking subtask”):

String result = subtask.get();

On the other hand, we can obtain the exception of a failed task via exception(). However, if we call exception() for a subtask (Subtask) that is completed with a result then we will get back an exception as IllegalStateException(“Subtask not completed or did not complete with exception”). So, if you are not sure that your task(s) is always complete with a result or an exception, it is better to call get() or exception() only after you test the state of the corresponding Subtask. A state of SUCCESS will safely allow you to call get(), while a state of FAILED will safely allow you to call exception(). So, in our case, we may prefer it this way:

String result = “”;
if (subtask.state().equals(Subtask.State.SUCCESS)) {
  result = subtask.get();
}

Beside Subtask.State.SUCCESS and Subtask.State.FAILED, we also have Subtask.State.UNAVAILABLE which means that the subtask is not available (for instance, if the subtask is still running then its state is UNAVAILABLE, but it could be other cause as well).That’s all!

ExecutorService vs. StructuredTaskScope

The previous code looks like the code that we write via a classical ExecutorService but, there are two big differences between these solutions. First of all, an ExecutorService holds the precious platform threads and allows us to pool them. On the other hand, a StructuredTaskScope is just a thin launcher for virtual threads that are cheap and shouldn’t be pooled. So, once we’ve done our job, a StructuredTaskScope can be destroyed and garbage collected. Second, an ExecutorService holds a single queue for all the tasks and the threads take from this queue whenever they have the chance to do it. A StructuredTaskScope relies on a fork/join pool and each virtual thread has it own wait queue. However, a virtual thread can steal a task from another queue as well. This is known as the work-stealing pattern and it was covered in Java Coding Problem, First Edition, Chapter 11.

Introducing ShutdownOnFailure – Concurrency – Virtual Threads, Structured Concurrency

222. Introducing ShutdownOnFailure

As its name suggests, StructuredTaskScope.ShutdownOnFailure is capable to return the exception of the first subtask that completes exceptionally and interrupts the rest of the subtasks (threads). For instance, we may want to fetch the testers with ids 1, 2, and 3. Since we need exactly these three testers, we want to be informed if any of them is not available and cancel everything. The code looks as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnFailure scope
      = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(Integer.MAX_VALUE));
    scope.join();
    logger.info(() -> “Subtask-1 state: ” + subtask1.state());
    logger.info(() -> “Subtask-2 state: ” + subtask2.state());
    logger.info(() -> “Subtask-3 state: ” + subtask3.state());
    Optional<Throwable> exception = scope.exception();
    if (exception.isEmpty()) {
      logger.info(() -> “Subtask-1 result:” + subtask1.get());
      logger.info(() -> “Subtask-2 result:” + subtask2.get());
      logger.info(() -> “Subtask-3 result:” + subtask3.get());
              
      return new TestingTeam(
        subtask1.get(), subtask2.get(), subtask3.get());
    } else {
      logger.info(() -> exception.get().getMessage());
      scope.throwIfFailed();
    }
  }
       
  return new TestingTeam();
}

In this example, we intentionally replaced id 3 with Integer.MAX_VALUE. Since there is no tester with this id the server will throw UserNotFoundException. This means that the states of the subtasks will reveal that the third subtask has failed:

[16:41:15] Subtask-1 state: SUCCESS
[16:41:15] Subtask-2 state: SUCCESS
[16:41:15] Subtask-3 state: FAILED

Moreover, when we call the exception() method we will get back an Optional<Throwable> containing this exception (a deep coverage of Optional feature is available in Java Coding Problems, First Edition, Chapter 12). If we decide to throw it then we simply call the throwIfFailed() method which wraps the original exception (the cause) in an ExecutionException and throws it. The message of the exception in our case will be:

Exception in thread “main”
 java.util.concurrent.ExecutionException:
  modern.challenge.UserNotFoundException: Code: 404

If we remove the guideline code then we can compact the previous code as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnFailure scope
      = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(
      Integer.MAX_VALUE)); // this causes exception
    scope.join();
    scope.throwIfFailed();
    // because we have an exception the following
    // code will not be executed
    return new TestingTeam(
      subtask1.get(), subtask2.get(), subtask3.get());          
  }
}

If no exception occurs then throwIfFailed() doesn’t do anything and those three testers are available. The result of each Subtask is available via the non-blocking Subtask.get().A subtask that completes exceptionally under the ShutdownOnFailure umbrella will be chosen to produce an exception. However, if all subtasks complete normally then we will not get any exceptions. On the other hand, if no subtasks were completed exceptionally but were canceled then ShutdownOnFailure will throw CancellationException.