Introducing scope object (StructuredTaskScope) 2 – Concurrency – Virtual Threads, Structured Concurrency

First, we create a StructuredTaskScope in a try-with-resources pattern. StructuredTaskScope implements AutoCloseable:

try (StructuredTaskScope scope
      = new StructuredTaskScope<String>()) {
    …
}

The scope is a wrapper for the virtual threads’ lifetime. We use the scope to fork as many virtual threads (subtasks) as needed via the fork(Callable task) method. Here, we fork only one virtual thread and get back a Subtask (forking is a non-blocking operation):

Subtask<String> subtask = scope.fork(() -> fetchTester(1));

Next, we have to call the join() method (or joinUntil(Instant deadline)). This method waits for all threads (all Subtask instances) forked from this scope (or, all threads that have been submitted to this scope) to complete, so it is a blocking operation. A scope should block only for waiting its subtasks to complete, and this is happening via join() or joinUntil().

scope.join();          

When the execution passes this line, we know that all threads (all forked Subtask) forked from this scope are complete with a result or an exception (each subtask run indenpendently, therefore each of them can complete with a result or an exception). Here, we call the non-blocking get() mehod to get the result but pay attention that calling get() for a task that did not complete will rise an exception as IllegalStateException(“Owner did not join after forking subtask”):

String result = subtask.get();

On the other hand, we can obtain the exception of a failed task via exception(). However, if we call exception() for a subtask (Subtask) that is completed with a result then we will get back an exception as IllegalStateException(“Subtask not completed or did not complete with exception”). So, if you are not sure that your task(s) is always complete with a result or an exception, it is better to call get() or exception() only after you test the state of the corresponding Subtask. A state of SUCCESS will safely allow you to call get(), while a state of FAILED will safely allow you to call exception(). So, in our case, we may prefer it this way:

String result = “”;
if (subtask.state().equals(Subtask.State.SUCCESS)) {
  result = subtask.get();
}

Beside Subtask.State.SUCCESS and Subtask.State.FAILED, we also have Subtask.State.UNAVAILABLE which means that the subtask is not available (for instance, if the subtask is still running then its state is UNAVAILABLE, but it could be other cause as well).That’s all!

ExecutorService vs. StructuredTaskScope

The previous code looks like the code that we write via a classical ExecutorService but, there are two big differences between these solutions. First of all, an ExecutorService holds the precious platform threads and allows us to pool them. On the other hand, a StructuredTaskScope is just a thin launcher for virtual threads that are cheap and shouldn’t be pooled. So, once we’ve done our job, a StructuredTaskScope can be destroyed and garbage collected. Second, an ExecutorService holds a single queue for all the tasks and the threads take from this queue whenever they have the chance to do it. A StructuredTaskScope relies on a fork/join pool and each virtual thread has it own wait queue. However, a virtual thread can steal a task from another queue as well. This is known as the work-stealing pattern and it was covered in Java Coding Problem, First Edition, Chapter 11.

Introducing scope object (StructuredTaskScope) – Concurrency – Virtual Threads, Structured Concurrency

220. Introducing scope object (StructuredTaskScope)

So far, we have covered a bunch of problems that use virtual threads directly or indirectly via an ExecutorService. We already know that virtual threads are cheap to create and block and that an application can run millions of them. We don’t need to reuse them, pool them or do any fancy stuff. Use and throw is the proper and recommended way to deal with virtual threads. This means that virtual threads are very useful for expressing and writing asynchronous code which is commonly based on a lot of threads that are capable of blocking/unblocking many times in a short period of time. On the other hand, we know that OS threads are expensive to create, very expensive to block, and are not easy to put in an asynchronous context.Before virtual threads (so, for many many years), we had to manage the lifecycle of OS threads via an ExecutorService/Executor and we could write asynchronous (or reactive) code via callbacks (you can find detailed coverage of asynchronous programming in Java Coding Problems, First Edition, Chapter 11).However, asynchronous/reactive code is hard to write/read, very hard to debug and profile, and almost deadly hard to unit test. Nobody wants to read and fix your asynchronous code! Moreover, once we start to write an application via asynchronous callback we tend to use this model for all tasks, even for those that shouldn’t be asynchronous. We can easily fall into this trap when we need to link somehow asynchronous code/results to non-asynchronous code. And, the easiest way to do it is to go only for asynchronous code.So, is there a better way? Yes, it is! Structured Concurrency should be the right answer. Structured Concurrency has started as an incubator project and reached the preview stage in JDK 21 (JEP 453).And, in this context, we should introduce StructuredTaskScope. A StructuredTaskScope is a virtual thread launcher for Callable tasks that returns a Subtask. A subtask is an extension of the well-known Supplier<T> fuctional interface represented by the StructuredTaskScope.Subtask<T> interface and forked with StructuredTaskScope.fork(Callable task). It follows and works based on the fundamental principle of structured concurrency (see Problem 203) – When a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block. These threads are responsible to run subtasks (Subtask) of the given task as a single unit of work.Let’s have an example of fetching a single tester (with id 1) from our web server via StructuredTaskScope:

public static TestingTeam buildTestingTeam()
       throws InterruptedException {
     
  try (StructuredTaskScope scope
      = new StructuredTaskScope<String>()) {
    Subtask<String> subtask
      = scope.fork(() -> fetchTester(1));
    logger.info(() -> “Waiting for ” + subtask.toString()
      + ” to finish …\n”);
    scope.join();          
    String result = subtask.get();
    logger.info(result);
          
    return new TestingTeam(result);
  }      
}