Microbenchmarks: BlockingQueues
This will be a relatively short post, but after my benchmarking post with locks, I became a bit curious about the performance of BlockingQueues.
BlockingQueue
BlockingQueues
offer a thread-safe way of communicating between threads, using a queue abstraction. The can be bounded to apply backpressure and make sure data doesn’t fill up your memory immediately, and are safe to use across any number of threads.
The “blocking” part of it comes from the fact that you are able (but not required) to block on both the put
and the take
. The put
will block when there’s too many items in the queue, and the take
will block until it can get an item from a queue.
In a sort of hand-wavey way, BlockingQueues are sort of analogous to CSP channels. This isn’t strictly true because they can be buffered, and because there’s no way to wait on multiple channels at the same time, but they still can work well for synchronization in applications that can be structured in a message-passing style.
There are two main types of blocking queues: LinkedBlockingQueue and ArrayBlockingQueue.
The Test
Conventional wisdom says that ArrayBlockingQueue
should be faster, simply because arrays tend to be faster than linked lists, but I’ve learned not to trust conventional wisdom for this kind of thing.
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.LongStream;
public class Main {
public static Integer QUEUE_SIZE = 100000;
public static Long NUM_PRODUCER_THREADS = 10L;
public static Long NUM_CONSUMER_THREADS = 50L;
public static Integer NUM_MESSAGES = 100_000_000;
public static BlockingQueue<Long> linkedQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
public static BlockingQueue<Long> arrayQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
public static CountDownLatch linkedLatch = new CountDownLatch(NUM_MESSAGES);
public static CountDownLatch arrayLatch = new CountDownLatch(NUM_MESSAGES);
public static void doThreadStuff(BlockingQueue<Long> queue, CountDownLatch latch) {
LongStream.range(0, NUM_PRODUCER_THREADS).forEach(i ->{
Thread.startVirtualThread( () -> {
for (var j = 0; j < NUM_MESSAGES; j++){
try {
queue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
});
LongStream.range(0, NUM_CONSUMER_THREADS).forEach(i -> {
Thread.startVirtualThread( () -> {
while (true) {
try {
var _thing = queue.take();
latch.countDown();
if (latch.getCount() <= 0){
break;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
});
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Starting");
var arrayStartTime = Duration.ofNanos(System.nanoTime());
doThreadStuff(arrayQueue, arrayLatch);
arrayLatch.await();
var arrayEndTime = Duration.ofNanos(System.nanoTime());
var arrayTotalTime = arrayEndTime.minus(arrayStartTime);
var linkedStartTime = Duration.ofNanos(System.nanoTime());
doThreadStuff(linkedQueue, linkedLatch);
linkedLatch.await();
var linkedEndTime = Duration.ofNanos(System.nanoTime());
var linkedTotalTime = linkedEndTime.minus(linkedStartTime);
System.out.println("Total Time For Linked: " + linkedTotalTime.toMillis() + "ms.");
System.out.println("Total Time For Array: " + arrayTotalTime.toMillis() + "ms.");
}
}
We spin up ten threads to put stuff into the queue, and fifty threads to consume. The consumer thread is a relatively simple while(true)
loop to wait for items to come.
We again use CountDownLatch
as a means of blocking the main thread until our producer and consumer threads are done.
Running this gives me these results (computer specs at the end of the post):
Total Time For Linked: 18054ms.
Total Time For Array: 7240ms.
Regular Threads
Nothing too surprising, the ArrayBlockingQueue
is roughly twice the speed.
But these are virtual threads, let’s modify our function to use platform threads:
public static void doThreadStuff(BlockingQueue<Long> queue, CountDownLatch latch) {
LongStream.range(0, NUM_PRODUCER_THREADS).forEach(i ->{
var t = new Thread(() -> {
for (var j = 0; j < NUM_MESSAGES; j++){
try {
queue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
});
LongStream.range(0, NUM_CONSUMER_THREADS).forEach(i -> {
var t = new Thread(() -> {
while (true) {
try {
var _thing = queue.take();
latch.countDown();
if (latch.getCount() <= 0){
break;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
});
}
When running this, the results become considerably more interesting:
Total Time For Linked: 5923ms.
Total Time For Array: 6842ms.
ArrayBlockingQueue
is slower now! It’s roughly the same speed, and if we run this again:
Total Time For Linked: 6501ms.
Total Time For Array: 6453ms.
For reasons that I don’t fully understand yet, when using vanilla platform threads, ArrayBlockingQueue
s operate at roughly the same speed as LinkedBlockingQueue
s.
Doing some digging, I think it’s because BlockingQueue
s are using ReentrantLock
s, which seem to be specially designed for virtual threads, but that’s honestly just a guess.
Conclusion
Anyway, shorter post this time, it was just to post a result that I thought was interesting. I want to get into the habit of doing this more often.
System used for testing:
Operating System: NixOS Unstable, updated this morning.
RAM: 64GB LPDDR5 6400 MT/s
Processor: AMD Ryzen 7 PRO 7840U w/ Radeon 780M Graphics
JVM: jdk21 package in Nixpkgs