Introducing structured concurrency 2 – Concurrency – Virtual Threads, Structured Concurrency

We skip the rest of the code since you can find it in the bundled code.Of course, we can implement code-answers to each of these questions via error handling, tasks abandon and abortion, ExecutorService, and so on, but this means a lot of work for the developer. Writing failsafe solutions that carefully cover all possible scenarios across multiple tasks/subtasks while tracking their progress in a concurrent environment is not an easy job. Not to mention how hard is to understand and maintain the resulting code by another developer or even the same developer after 1-2 years or even months.It is time to add some structure to this code, so let’s introduce structured concurrency (or, Project Loom).Structured concurrency relies on several pillars meant to bring lightweight concurrency in Java. The fundamental pillar or principle of structured concurrency is highlighted next.

The fundamental principle of structured concurrency: When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block.

Based on this principle, the thread that initiates a concurrent context is the parent-thread or the owner-thread. All threads started by the parent-thread are children-threads or forks, so between them, these threads are siblings. Together, the parent-thread and the children-threads, define a parent-child hierarchy.Putting the structured concurrency principle in a diagram can be seen as follows:

Figure 10.2 – Parent-child hierarchy in structured concurrency

In the context of parent-child hierarchy, we have support for error/exception handling with short-circuiting, cancellation propagation, and monitoring/observability.Error/exception handling with short-circuiting: If a child-thread fails then all child-threads are canceled unless they are complete. For instance, if futureTester(1) fails, then futureTester(2) and futureTester(3) are automatically canceled.Cancellation propagation: If the parent-thread is interrupted until joining the child-threads is over then these forks (the child-threads/subtasks) are canceled automatically. For instance, if the thread executing buildTestingTeam() gets interrupted then its three forks are automatically canceled.Monitoring/observability: A thread dump reveals crystal-clear the entire parent-child hierarchy no matter how many levels have been spawned. Moreover, in structured concurrency, we take advantage of scheduling and memory management of threads.While these are purely concepts, writing code that respects and follows these concepts requires the proper API and the following awesome callout:

Figure 10.3 – Don’t reuse virtual threads

Cut this out and stick it somewhere to see it every day! So, in structured concurrency, don’t reuse virtual threads. I know what you are thinking: hey dude, threads are expensive and limited, so we have to reuse them. A quick hint: we are talking about virtual threads (massive throughput), not classical threads, but this topic is covered in the next problem.

Cloud computing today 2 – Fundamentals of Cloud Architecture

While cloud computing offers many benefits, it also comes with several challenges that must be addressed. One of the primary challenges is security, as cloud providers must ensure that users’ data is protected from unauthorized access or disclosure. Another challenge is vendor lock-in, as users may find it difficult to switch between cloud providers due to differences in technologies and architectures. Finally, there is the challenge of managing cloud costs, as users must carefully monitor and optimize their resource usage to avoid unexpected expenses.

Despite these challenges, cloud computing has become an essential part of the modern technology landscape, enabling businesses and individuals to access and use technology more efficiently and effectively than ever before.

The following figure depicts the general idea behind cloud computing:

Figure 1.1 – The versatility and flexibility of cloud computing

This figure provides a concise overview of cloud computing, featuring key components such as databases, applications, compute, mobile devices, servers, and storage. It also highlights different cloud deployment models: public, private, and hybrid clouds. This figure visually represents these components and models, showcasing the interconnected nature of cloud computing.

Cloud computing has become an essential part of the modern technology landscape, enabling businesses and individuals to access and use technology more efficiently and effectively than ever before. With cloud computing, organizations can access technology resources as needed, without having to invest in and manage on-premises infrastructure. This allows companies to focus on their core business, while the cloud service provider manages the underlying technology. There are three main types of cloud computing: public cloud, private cloud, and hybrid cloud. The following figure depicts the basic design of cloud technology:

Figure 1.2 – Basic cloud design

The preceding figure depicts how basic cloud components reside within the cloud.

In this section, you learned about the origins and evolution of cloud computing, from time-sharing to the commercialization of services. You gained insights into key milestones, such as the development of virtualization technologies and the rise of utility computing.

Next, you explored the current state of cloud computing, including its models (IaaS, PaaS, and SaaS).

The next section dives into the foundational aspects of cloud architecture and provides you with a comprehensive understanding of its key components and design principles. It explores the fundamental building blocks of cloud architecture, including virtualization, resource pooling, and on-demand self-service.

The benefits of cloud architecture 2 – Fundamentals of Cloud Architecture

Cloud services provide a range of collaboration tools that enable teams to work together more efficiently and productively. Some of the key collaboration features provided by cloud services are as follows:

  • Real-time collaboration: Cloud services provide real-time collaboration features such as co-authoring, commenting, and chat, allowing teams to work on the same document or project simultaneously and communicate with each other in real time
  • Shared storage: Cloud services provide shared storage, making it easier for teams to access and share files and documents, regardless of their location or device
  • Version control: Cloud services offer version control features that allow teams to track changes made to documents and restore previous versions if necessary
  • Integration with other tools: Cloud services integrate with a wide range of other collaboration tools such as project management tools, instant messaging, and video conferencing, providing a seamless collaboration experience
  • Access control: Cloud services provide access control features that enable teams to control who has access to their files and documents, ensuring that sensitive data is protected
  • Mobile access: Cloud services are accessible from anywhere, on any device, making it easy for teams to collaborate even when they are not in the same location

Cloud-based collaboration tools provided by cloud architecture can help organizations improve their productivity, streamline their workflows, and foster better collaboration among their teams. In today’s fast-paced business environment, the increasing prevalence of remote work and distributed teams has elevated the significance of cloud-based collaboration. By embracing cloud services, organizations can effectively adapt to these changes and gain a competitive edge.

Integration with other tools in cloud architecture allows cloud services to seamlessly integrate with other collaboration and productivity tools used by an organization. This integration helps create a more efficient and streamlined workflow by allowing users to access all their tools and data from a single location.

Some examples of tools that can be integrated with cloud services include project management software, communication and collaboration tools, CRM systems, and email clients. Here are some benefits of integrating cloud services with other tools:

  • Improved productivity: Integration with other tools enables users to access all their data and tools in one place, reducing the need to switch between different applications and improving productivity
  • Better collaboration: Integration with collaboration tools such as instant messaging and video conferencing can improve communication and collaboration among team members.
  • Automation: Integration with other tools can enable automation of repetitive tasks, such as data entry and reporting, saving time and reducing the risk of errors
  • Data consistency: Integration with other tools can help ensure data consistency across different systems, reducing the risk of errors and improving data quality
  • Real-time updates: Integration with other tools can enable real-time updates, ensuring that all team members have access to the latest data and information

Cloud computing systems are designed to seamlessly connect and collaborate with a wide range of existing tools and technologies. This integration enables organizations to leverage their existing infrastructure, applications, and data seamlessly within the cloud environment. By integrating with other tools, cloud architecture allows for smooth data transfer, streamlined workflows, and improved interoperability between different systems. This integration capability enhances productivity, efficiency, and the overall effectiveness of cloud-based solutions by providing a unified and cohesive ecosystem for organizations to leverage their existing tools and resources alongside cloud services. Integration with other tools is an important aspect of cloud architecture because it helps organizations create a more efficient and streamlined workflow, improving productivity, collaboration, and data quality. By integrating cloud services with other tools, organizations can create a more cohesive and effective technology ecosystem that supports their business objectives.

The following section provides a concise overview of the essential guidelines for designing and implementing effective cloud architectures. It emphasizes key practices such as scalability, high availability, performance optimization, security implementation, cost optimization, automation, and monitoring.

Introducing virtual threads 3 – Concurrency – Virtual Threads, Structured Concurrency

Waiting for a virtual task to terminate

The given task is executed by a virtual thread, while the main thread is not blocked. In order to wait for the virtual thread to terminate we have to call one of the join() flavors. We have join() without arguments that waits indefinitely, and a few flavors that wait for a given time (for instance, join(Duration duration), join(long millis)):

vThread.join();

These methods throw an InterruptedException so you have to catch it and handle it or just throw it. Now, because of join(), the main thread cannot terminate before the virtual thread. It has to wait until the virtual thread completes.

Creating an unstarted virtual thread

Creating an unstarted virtual thread can be done via unstarted(Runnable task) as follows:

Thread vThread = Thread.ofVirtual().unstarted(task);

Or, via Thread.Builder as follows:

Thread.Builder builder = Thread.ofVirtual();
Thread vThread = builder.unstarted(task);

This time, the thread is not scheduled for execution. It will be scheduled for execution only after we explicitly call the start() method:

vThread.start();

We can check if a thread is alive (it was started but not terminated) via the isAlive() method:

boolean isalive = vThread.isAlive();

The unstarted() method is available for platform threads as well (there is also the Thread.Builder.OfPlatform subinterface):

Thread pThread = Thread.ofPlatform().unstarted(task);

We can start pThread by calling the start() method.

Creating a ThreadFactory for virtual threads

You can create a ThreadFactory of virtual threads as follows:

ThreadFactory tfVirtual = Thread.ofVirtual().factory();
ThreadFactory tfVirtual = Thread.ofVirtual()
  .name(“vt-“, 0).factory(); // ‘vt-‘ name prefix, 0 counter

Or, via Thread.Builder as follows:

Thread.Builder builder = Thread.ofVirtual().name(“vt-“, 0);
ThreadFactory tfVirtual = builder.factory();

And, a ThreadFactory for platform threads as follows (you can use Thread.Builder as well):

ThreadFactory tfPlatform = Thread.ofPlatform()
  .name(“pt-“, 0).factory();// ‘pt-‘ name prefix, 0 counter

Or, a ThreadFactory that we can use to switch between virtual/platform threads as follows:

static class SimpleThreadFactory implements ThreadFactory {
  @Override
  public Thread newThread(Runnable r) {
    // return new Thread(r);                // platform thread
    return Thread.ofVirtual().unstarted(r); // virtual thread
  }
}

Next, we can use any of these factories via the ThreadFactory#newThread(Runnable task) as follows:

tfVirtual.newThread(task).start();
tfPlatform.newThread(task).start();
SimpleThreadFactory stf = new SimpleThreadFactory();
stf.newThread(task).start();

If the thread factory starts the created thread as well then there is no need to explicitly call the start() method.

Checking a virtual thread details

Moreover, we can check if a certain thread is a platform thread or a virtual thread via isVirtual():

Thread vThread = Thread.ofVirtual()
  .name(“my_vThread”).unstarted(task);
Thread pThread1 = Thread.ofPlatform()
  .name(“my_pThread”).unstarted(task);
Thread pThread2 = new Thread(() -> {});
logger.info(() -> “Is vThread virtual ? “
  + vThread.isVirtual());  // true
logger.info(() -> “Is pThread1 virtual ? “
  + pThread1.isVirtual()); // false
logger.info(() -> “Is pThread2 virtual ? “
  + pThread2.isVirtual()); // false

Obviously, only vThread is a virtual thread.A virtual thread runs always as a daemon thread. The isDaemon() method returns true, and trying to call setDaemon(false) will throw an exception.The priority of a virtual thread is always  NORM_PRIORITY (calling getPriority() always return 5 – constant int for NORM_PRIORITY). Calling setPriority() with a different value has no effect.A virtual thread cannot be part of a thread group because it already belongs to the VirtualThreads group. Calling getThreadGroup().getName() returns VirtualThreads.A virtual thread has no permission with Security Manager (which is deprecated anyway).

Explaining how virtual threads work 2 – Concurrency – Virtual Threads, Structured Concurrency

Capturing virtual threads

So far we learned that a virtual thread is mounted by JVM to a platform thread which becomes its carrier thread. Moreover, the carrier thread runs the virtual thread until it hit a blocking (I/O) operation. At that point, the virtual thread is unmounted from the carrier thread and it will be rescheduled after the blocking (I/O) operation is done.While this scenario is true for most of the blocking operations resulting in unmounting the virtual threads and freeing the platform thread (and the underlying OS thread), there are a few exceptional cases when the virtual threads are not unmounted. There are two main causes for this behavior:

Limitations on OS (for instance, a significant number of filesystem operations)

Limitations on JDK (for instance, Object#wait())

When the virtual thread cannot be unmounted from its carrier thread it means that the carrier thread and the underlying OS thread are blocked. This may affect the scalability of the application, so, if the platform threads pool allows it, JVM can take the decision of adding one more platform thread. So, for a period of time, the number of platform threads may exceed the number of available cores.

Pinning virtual threads

There are also two other use cases when a virtual thread cannot be unmounted:

When the virtual thread runs code inside a synchronized method/block

When the virtual thread invokes a foreign function or native method (topic covered in Chapter 7)

In this scenario, we say that the virtual thread is pinned to the carrier thread. This may affect the scalability of the application, but JVM will not increase the number of platform threads. Instead of this, we should take action and refactor the synchronized blocks to ensure that the locking code is simple, clear, and short. Whenever possible prefer java.util.concurrent locks instead of synchronized blocks. If we manage to avoid long and frequent locking periods then we will not face any significant scalability issues. In future releases, the JDK team aims to eliminate the pinning inside synchronized blocks.

Exemplifying thread context switching 2 – Concurrency – Virtual Threads, Structured Concurrency

Example 2

In this example, let’s start by limiting the parallelism to 1 (is like having a single core and a single virtual thread):

System.setProperty(
  “jdk.virtualThreadScheduler.maxPoolSize”, “1”);
System.setProperty(
  “jdk.virtualThreadScheduler.maxPoolSize”, “1”);
System.setProperty(
  “jdk.virtualThreadScheduler.maxPoolSize”, “1”);

Next, let’s consider that we have a slow task (we call it slow because it sleeps for 5 seconds):

Runnable slowTask = () -> {
  logger.info(() -> Thread.currentThread().toString()
    + ” | working on something”);          
  logger.info(() -> Thread.currentThread().toString()
    + ” | break time (blocking)”);
  try { Thread.sleep(Duration.ofSeconds(5)); }
    catch (InterruptedException ex) {} // blocking          
  logger.info(() -> Thread.currentThread().toString()
    + ” | work done”);
};

And, a fast task (similar to the slow task but sleeps only 1 second):

Runnable fastTask = () -> {
  logger.info(() -> Thread.currentThread().toString()
    + ” | working on something”);          
  logger.info(() -> Thread.currentThread().toString()
    + ” | break time (blocking)”);
  try { Thread.sleep(Duration.ofSeconds(1)); }
    catch (InterruptedException ex) {} // blocking          
  logger.info(() -> Thread.currentThread().toString()
    + ” | work done”);
};

Next, we define two virtual threads to execute these two tasks as follows:

Thread st = Thread.ofVirtual()
  .name(“slow-“, 0).start(slowTask);
Thread ft = Thread.ofVirtual()
  .name(“fast-“, 0).start(fastTask);
      
st.join();
ft.join();

If we run this code then the output will be as follows:

[08:38:46] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | working on something
[08:38:46] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | break time (blocking)
[08:38:46] VirtualThread[#24,fast-0]/runnable
           @ForkJoinPool-1-worker-1 | working on something
[08:38:46] VirtualThread[#24,fast-0]/runnable
           @ForkJoinPool-1-worker-1 | break time (blocking)
[08:38:47] VirtualThread[#24,fast-0]/runnable
           @ForkJoinPool-1-worker-1 | work done
[08:38:51] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | work done

If we analyze this output, we notice that the execution starts the slow task. The fast task cannot be executed since worker-1 (the only available worker) is busy executing the slow task.

[08:38:46] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | working on something

Worker-1 executes the slow task until this task hits the sleeping operation. Since this is a blocking operation, the corresponding virtual thread (#22) is unmounted from worker-1.

[08:38:46] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | break time (blocking)

JVM takes advantage of the fact that worker-1 is available and pushes for the execution of the fast task.

[08:38:46] VirtualThread[#24,fast-0]/runnable
           @ForkJoinPool-1-worker-1 | working on something

The fast task also hits a sleeping operation and its virtual thread (#24) is unmounted.

[08:38:46] VirtualThread[#24,fast-0]/runnable
           @ForkJoinPool-1-worker-1 | break time (blocking)

But, the fast task sleeps only for 1 second, so its blocking operation is over before the slow task blocking operation which is still sleeping. So, the JVM can schedule the fast task for execution again, and worker-1 is ready to accept it.

[08:38:47] VirtualThread[#24,fast-0]/runnable
           @ForkJoinPool-1-worker-1 | work done

At this moment, the fast task is done and worker-1 is free. But, the slow task is still sleeping. After these long 5 seconds, the JVM schedules the slow task for execution and worker-1 is there to take it.

[08:38:51] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | work done

Done!

Example 3

This example is just a slight modification of Example 2. This time, let’s consider that the slow task contains a non-blocking operation that runs forever. In this case, this operation is simulated via an infinite loop:

Runnable slowTask = () -> {
  logger.info(() -> Thread.currentThread().toString()
    + ” | working on something”);          
  logger.info(() -> Thread.currentThread().toString()
    + ” | break time (non-blocking)”);
  while(dummyTrue()) {} // non-blocking          
  logger.info(() -> Thread.currentThread().toString()
    + ” | work done”);
};
static boolean dummyTrue() { return true; }

We have a single worker (worker-1) and the fast task is the same as in Example 2. If we run this code, the execution hangs on as follows:

[09:02:45] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | working on something
[09:02:45] VirtualThread[#22,slow-0]/runnable
           @ForkJoinPool-1-worker-1 | break time(non-blocking)
// hang on

The execution hangs on because the infinite loop is not seen as a blocking operation. In other words, the virtual thread of the slow task (#22) is never unmounted. Since there is a single worker, the JVM cannot push for the execution of the fast task.If we increase the parallelism from 1 to 2 then the fast task will be successfully executed by worker-2, while worker-1 (executing the slow task) will simply hang on to a partial execution. We can avoid such situations by relying on a timeout join such as join(Duration duration). This way, after the given timeout, the slow task will be automatically interrupted. So, pay attention to such scenarios.

Introducing the ExecutorService invoke all/any for virtual threads – part 2 – Concurrency – Virtual Threads, Structured Concurrency

217. Introducing the ExecutorService invoke all/any for virtual threads – part 2

Earlier, in Problem 203, we wrote a piece of “unstructured” concurrency code for building a testing team of three testers served by an external server.Now, let’s try to re-write the buildTestingTeam() method via invokeAll()/Any() and newVirtualThreadPerTaskExecutor(). If we rely on invokeAll() then the application will attempt to load three testers by id as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException {
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    futures.forEach(f -> logger.info(() -> “State: “
      + f.state()));
    return new TestingTeam(futures.get(0).resultNow(),
      futures.get(1).resultNow(), futures.get(2).resultNow());                      
  }      
}

We have three testers with ids 1, 2, and 3. So, the output will be:

[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS
[07:47:32] State: SUCCESS

In the next problem, we will see how we can take decisions based on task state.If we can handle the testing phase even with a single tester then we can rely on invokeAny() as follows:

public static TestingTeam buildTestingTeam()
      throws InterruptedException, ExecutionException {
      
  try (ExecutorService executor
    = Executors.newVirtualThreadPerTaskExecutor()) {
    String result = executor.invokeAny(
      List.of(() -> fetchTester(1),
              () -> fetchTester(2),
              () -> fetchTester(3)));
          
    logger.info(result);
    return new TestingTeam(result);
  }      
}

This code will return a single result representing one of these three testers. If none of them is available then we will get an UserNotFoundException exception.

218. Hooking task state

Starting with JDK 19 we can rely on Future#state(). This method computes the state of a Future based on the well-known get(), isDone(), and isCancelled() and returns a Future.State enum entry as follows:

CANCELLED – The task was canceled.

FAILED – The task was completed exceptionally (with an exception).

RUNNING – The task is still running (has not been completed).

SUCCESS – The task was completed normally with a result (no exception).

In the following snippet of code, we analyze the state of loading the testing team members and act accordingly:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
  List<String> testers = new ArrayList<>();
      
  try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
    List<Future<String>> futures = executor.invokeAll(
      List.of(() -> fetchTester(Integer.MAX_VALUE),
              () -> fetchTester(2),
              () -> fetchTester(Integer.MAX_VALUE)));
    futures.forEach(f -> {
      logger.info(() -> “Analyzing ” + f + ” state …”);            
              
      switch (f.state()) {
        case RUNNING -> throw new IllegalStateException(
          “Future is still in the running state …”);
        case SUCCESS -> {
          logger.info(() -> “Result: ” + f.resultNow());
          testers.add(f.resultNow());
        }
        case FAILED ->
          logger.severe(() -> “Exception: “
            + f.exceptionNow().getMessage());
        case CANCELLED ->
          logger.info(“Cancelled ?!?”);
      }
    });                      
  }
      
  return new TestingTeam(testers.toArray(String[]::new));
}

We know that when the execution reaches the switch block the Future objects should be completely normal or exceptional. So, if the current Future state is RUNNING then this is a really weird situation (possibly a bug) and we throw an IllegalStateException. Next, if the Future state is SUCCESS (fetchTester(2)) then we have a result that can be obtained via resultNow(). This method was added in JDK 19 and it is useful when we know that we have a result. The resultNow() method returns immediately without waiting (as get()). If the state is FAILED (fetchTester(Integer.MAX_VALUE)) then we log the exception via exceptionNow(). This method was also added in JDK 19 and it returns immediately the underlying exception of a failed Future. Finally, if the Future was canceled then there is nothing to do. We just report it in the log.

Combining newVirtualThreadPerTaskExecutor() and streams – Concurrency – Virtual Threads, Structured Concurrency

219. Combining newVirtualThreadPerTaskExecutor() and streams

Streams and newVirtualThreadPerTaskExecutor() is a handy combination. Here is an example that relies on IntStream to submit 10 simple tasks and collect the returned List of Future:

try (ExecutorService executor
      = Executors.newVirtualThreadPerTaskExecutor()) {
  List<Future<String>> futures = IntStream.range(0, 10)
    .mapToObj(i -> executor.submit(() -> {
       return Thread.currentThread().toString()
         + “(” + i + “)”;
  })).collect(toList());
  // here we have the following snippet of code
}

Next, we wait for each Future to complete by calling the get() method:

  futures.forEach(f -> {
    try {
      logger.info(f.get());
    } catch (InterruptedException | ExecutionException ex) {
      // handle exception
    }
  });

Moreover, using stream pipelines is quite useful in combination with invokeAll(). For instance, the following stream pipeline returns a List of results (it filters all Future that hasn’t complete successfully):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .<String>mapMulti((f, c) -> {
    c.accept((String) f.resultNow());
  }).collect(Collectors.toList());

Alternatively, we can write the following solution (without mapMulti()):

List<String> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(f -> f.resultNow().toString())
  .toList();

Of course, if List<Object> is all you need then you can go straightforward via Future::resultNow as follows:

List<Object> results = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”, () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.SUCCESS)
  .map(Future::resultNow)
  .toList();

On the other hand, you may need to collect all the Future that has been completed exceptionally. This can be achieved via exceptionNow() as follows (we intentionally sneaked in the given List<Callable> a Callable that will generate an StringIndexOutOfBoundsException, () -> “pass02”.substring(50)):

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”,
          () -> “pass02”.substring(50), () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .<Throwable>mapMulti((f, c) -> {
    c.accept((Throwable) f.exceptionNow());
  }).collect(Collectors.toList());

If you don’t prefer mapMulti() then rely on the classical approach:

List<Throwable> exceptions = executor.invokeAll(
  List.of(() -> “pass01”, () -> “pass02”.substring(50),
          () -> “pass03”))
  .stream()
  .filter(f -> f.state() == Future.State.FAILED)
  .map(Future::exceptionNow)
  .toList();

You can find all these examples in the bundled code.

Introducing scope object (StructuredTaskScope) – Concurrency – Virtual Threads, Structured Concurrency

220. Introducing scope object (StructuredTaskScope)

So far, we have covered a bunch of problems that use virtual threads directly or indirectly via an ExecutorService. We already know that virtual threads are cheap to create and block and that an application can run millions of them. We don’t need to reuse them, pool them or do any fancy stuff. Use and throw is the proper and recommended way to deal with virtual threads. This means that virtual threads are very useful for expressing and writing asynchronous code which is commonly based on a lot of threads that are capable of blocking/unblocking many times in a short period of time. On the other hand, we know that OS threads are expensive to create, very expensive to block, and are not easy to put in an asynchronous context.Before virtual threads (so, for many many years), we had to manage the lifecycle of OS threads via an ExecutorService/Executor and we could write asynchronous (or reactive) code via callbacks (you can find detailed coverage of asynchronous programming in Java Coding Problems, First Edition, Chapter 11).However, asynchronous/reactive code is hard to write/read, very hard to debug and profile, and almost deadly hard to unit test. Nobody wants to read and fix your asynchronous code! Moreover, once we start to write an application via asynchronous callback we tend to use this model for all tasks, even for those that shouldn’t be asynchronous. We can easily fall into this trap when we need to link somehow asynchronous code/results to non-asynchronous code. And, the easiest way to do it is to go only for asynchronous code.So, is there a better way? Yes, it is! Structured Concurrency should be the right answer. Structured Concurrency has started as an incubator project and reached the preview stage in JDK 21 (JEP 453).And, in this context, we should introduce StructuredTaskScope. A StructuredTaskScope is a virtual thread launcher for Callable tasks that returns a Subtask. A subtask is an extension of the well-known Supplier<T> fuctional interface represented by the StructuredTaskScope.Subtask<T> interface and forked with StructuredTaskScope.fork(Callable task). It follows and works based on the fundamental principle of structured concurrency (see Problem 203) – When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block. These threads are responsible to run subtasks (Subtask) of the given task as a single unit of work.Let’s have an example of fetching a single tester (with id 1) from our web server via StructuredTaskScope:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
     
  try (StructuredTaskScope scope
      = new StructuredTaskScope<String>()) {
    Subtask<String> subtask
      = scope.fork(() -> fetchTester(1));
    logger.info(() -> “Waiting for ” + subtask.toString()
      + ” to finish …\n”);
    scope.join();          
    String result = subtask.get();
    logger.info(result);
          
    return new TestingTeam(result);
  }      
}

Introducing ShutdownOnFailure – Concurrency – Virtual Threads, Structured Concurrency

222. Introducing ShutdownOnFailure

As its name suggests, StructuredTaskScope.ShutdownOnFailure is capable to return the exception of the first subtask that completes exceptionally and interrupts the rest of the subtasks (threads). For instance, we may want to fetch the testers with ids 1, 2, and 3. Since we need exactly these three testers, we want to be informed if any of them is not available and cancel everything. The code looks as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnFailure scope
      = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(Integer.MAX_VALUE));
    scope.join();
    logger.info(() -> “Subtask-1 state: ” + subtask1.state());
    logger.info(() -> “Subtask-2 state: ” + subtask2.state());
    logger.info(() -> “Subtask-3 state: ” + subtask3.state());
    Optional<Throwable> exception = scope.exception();
    if (exception.isEmpty()) {
      logger.info(() -> “Subtask-1 result:” + subtask1.get());
      logger.info(() -> “Subtask-2 result:” + subtask2.get());
      logger.info(() -> “Subtask-3 result:” + subtask3.get());
              
      return new TestingTeam(
        subtask1.get(), subtask2.get(), subtask3.get());
    } else {
      logger.info(() -> exception.get().getMessage());
      scope.throwIfFailed();
    }
  }
       
  return new TestingTeam();
}

In this example, we intentionally replaced id 3 with Integer.MAX_VALUE. Since there is no tester with this id the server will throw UserNotFoundException. This means that the states of the subtasks will reveal that the third subtask has failed:

[16:41:15] Subtask-1 state: SUCCESS
[16:41:15] Subtask-2 state: SUCCESS
[16:41:15] Subtask-3 state: FAILED

Moreover, when we call the exception() method we will get back an Optional<Throwable> containing this exception (a deep coverage of Optional feature is available in Java Coding Problems, First Edition, Chapter 12). If we decide to throw it then we simply call the throwIfFailed() method which wraps the original exception (the cause) in an ExecutionException and throws it. The message of the exception in our case will be:

Exception in thread “main”
 java.util.concurrent.ExecutionException:
  modern.challenge.UserNotFoundException: Code: 404

If we remove the guideline code then we can compact the previous code as follows:

public static TestingTeam buildTestingTeam()
       throws InterruptedException, ExecutionException {
      
  try (ShutdownOnFailure scope
      = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<String> subtask1
      = scope.fork(() -> fetchTester(1));
    Subtask<String> subtask2
      = scope.fork(() -> fetchTester(2));
    Subtask<String> subtask3
      = scope.fork(() -> fetchTester(
      Integer.MAX_VALUE)); // this causes exception
    scope.join();
    scope.throwIfFailed();
    // because we have an exception the following
    // code will not be executed
    return new TestingTeam(
      subtask1.get(), subtask2.get(), subtask3.get());          
  }
}

If no exception occurs then throwIfFailed() doesn’t do anything and those three testers are available. The result of each Subtask is available via the non-blocking Subtask.get().A subtask that completes exceptionally under the ShutdownOnFailure umbrella will be chosen to produce an exception. However, if all subtasks complete normally then we will not get any exceptions. On the other hand, if no subtasks were completed exceptionally but were canceled then ShutdownOnFailure will throw CancellationException.