Hooking virtual threads and sync code – Concurrency – Virtual Threads, Structured Concurrency

214. Hooking virtual threads and sync code

The goal of this problem is to highlight how virtual threads interact with synchronous code. For this, we use the built-in java.util.concurrent.SynchronousQueue. This is a built-in blocking queue that allows only one thread to operate at a time. More precisely, a thread that wants to insert an element in this queue is blocked until another thread attempts to remove an element from it and vice versa. Basically, a thread cannot insert an element unless another thread attempts to remove an element.Let’s assume that a virtual thread attempts to insert in a SynchronousQueue, while a platform thread attempts to remove from this queue. In code lines, we have:

SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable task = () -> {
  logger.info(() -> Thread.currentThread().toString()
    + ” sleeps for 5 seconds”);
  try { Thread.sleep(Duration.ofSeconds(5)); }
    catch (InterruptedException ex) {}
  logger.info(() -> “Running “
    + Thread.currentThread().toString());
  queue.add(Integer.MAX_VALUE);
};
logger.info(“Before running the task …”);
Thread vThread = Thread.ofVirtual().start(task);
logger.info(vThread.toString());

So, the virtual thread (vThread) waits for 5 seconds before attempting to insert an element in the queue. However, it will not successfully insert an element until another thread attempts to remove an element from this queue:

logger.info(() -> Thread.currentThread().toString()
  + ” can’t take from the queue yet”);
int maxint = queue.take();                      
logger.info(() -> Thread.currentThread().toString()
  + “took from queue: ” + maxint);              
      
logger.info(vThread.toString());
logger.info(“After running the task …”);

Here, the Thread.currentThread() refers to the main thread of the application which is a platform thread not blocked by vThread. This thread successfully removes from the queue only if another thread attempts to insert (here, vThread):The output of this code looks as follows:

[09:41:59] Before running the task …
[09:42:00] VirtualThread[#22]/runnable
[09:42:00] Thread[#1,main,5,main]
           can’t take from the queue yet
[09:42:00] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           sleeps for 5 seconds
[09:42:05] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           inserts in the queue
[09:42:05] Thread[#1,main,5,main]took from queue: 2147483647
[09:42:05] VirtualThread[#22]/terminated
[09:42:05] After running the task …

The virtual thread started its execution (is in runnable state) but the main thread cannot remove from the queue until the virtual thread will insert an element, so it is blocked by the queue.take() operation:

[09:42:00] VirtualThread[#22]/runnable
[09:42:00] Thread[#1,main,5,main]
           can’t take from the queue yet

Meanwhile, the virtual thread sleeps for 5 seconds (at this time the main thread has nothing to do) and afterward, it inserts an element:

[09:42:00] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           sleeps for 5 seconds
[09:42:05] VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
           inserts in the queue

The virtual thread has inserted an element into the queue, so the main thread can remove this element from it:

[09:42:05] Thread[#1,main,5,main]took from queue: 2147483647

The virtual thread is also terminated:

[09:42:05] VirtualThread[#22]/terminated

So, virtual threads, platform threads, and synchronous code work as expected. In the bundled code, you can find an example where the virtual and platform threads switch their places. So, the platform thread attempts to insert and the virtual thread attempts to remove.