Introducing structured concurrency – Concurrency – Virtual Threads, Structured Concurrency
210. Introducing structured concurrency
If you are as old as I am then most probably you’ve started programming with a language such as BASIC or a similar unstructured programming language. At that time, an application was just a sequence of lines that defined a sequential logic/behavior via a bunch of GOTO statements that drive the flow by jumping as a kangaroo back and forward between the code lines. Well, in Java, the building blocks of a typical concurrent code are so primitive that the code looks somehow like unstructured programming because is hard to follow and understand what’s going on. Moreover, a thread dump of a concurrent task doesn’t provide the needed answers.Let’s follow a snippet of Java concurrent code and let’s stop every time we have a question (always check the code below the question). The task is to concurrently load three testers by id and team them up in a testing team. First, let’s list here the server code (we will use this simple code to serve us in this problem and subsequent problems):
public static String fetchTester(int id)
throws IOException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest requestGet = HttpRequest.newBuilder()
.GET()
.uri(URI.create(“https://reqres.in/api/users/” + id))
.build();
HttpResponse<String> responseGet = client.send(
requestGet, HttpResponse.BodyHandlers.ofString());
if (responseGet.statusCode() == 200) {
return responseGet.body();
}
throw new UserNotFoundException(“Code: “
+ responseGet.statusCode());
}
Next, the code that we are especially interested in starts as follows:
private static final ExecutorService executor
= Executors.newFixedThreadPool(2);
public static TestingTeam buildTestingTeam()
throws InterruptedException {
…
First stop: As you can see buildTestingTeam() throws an InterruptedException. So, if the thread executing buildTestingTeam() gets interrupted then how can we easily interrupt the following threads:
Future<String> future1 = futureTester(1);
Future<String> future2 = futureTester(2);
Future<String> future3 = futureTester(3);
try {
…
Second stop: Here we have three get() calls. So, the current thread waits for other threads to complete. Can we easily observe those threads?
String tester1 = future1.get();
String tester2 = future2.get();
String tester3 = future3.get();
logger.info(tester1);
logger.info(tester2);
logger.info(tester3);
return new TestingTeam(tester1, tester2, tester3);
} catch (ExecutionException ex) {
…
Third stop: If an ExecutionException is caught then we know that one of these three Future has failed. Can we easily cancel the remaining two or they will just hang on there? Probably future1 will fail while future2 and future3 will complete successfully or maybe future2 will complete successfully while future3 will just run forever (a so-called orphan thread). This may lead to serious mismatches in the expected results, memory leaks, and so on.
throw new RuntimeException(ex);
} finally {
…
Fourth stop: Next line of code is used to shut down the executor, but is so easy to overlook it. Is this the proper place to do this?
shutdownExecutor(executor);
}
}
Fifth stop: If you didn’t spot the previous line of code then is legitimate to ask yourself how/where this executor get shut down.
public static Future<String> futureTester(int id) {
return executor.submit(() -> fetchTester(id));
}