Skip to content

Instantly share code, notes, and snippets.

@Goldmensch
Created February 8, 2023 14:42
Show Gist options
  • Select an option

  • Save Goldmensch/69bd2bfc43ae28e69851f7811e4254ad to your computer and use it in GitHub Desktop.

Select an option

Save Goldmensch/69bd2bfc43ae28e69851f7811e4254ad to your computer and use it in GitHub Desktop.
Java implementation of Vyukov Mpsc using java 19 preview
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
class Parker {
private final Lock lock = new ReentrantLock();
private final WaitList waiters = new WaitList();
void park(BooleanSupplier pred) {
if (pred.getAsBoolean()) return;
Thread current = Thread.currentThread();
for (int i = 0; i < 6; i++) {
if (pred.getAsBoolean()) return;
for (int j = 0; j < (i*i); j++) Thread.onSpinWait();
}
var node = new Waiter(current);
lock.lock();
try {
waiters.add(node);
} finally {
lock.unlock();
}
boolean interrupted = false;
while (!pred.getAsBoolean()) {
LockSupport.park(this);
if (Thread.interrupted()) {
interrupted = true;
}
}
lock.lock();
try {
waiters.remove(node);
} finally {
lock.unlock();
}
if (interrupted) {
current.interrupt();
}
}
public void unparkOne() {
lock.lock();
try {
LockSupport.unpark(waiters.peek());
} finally {
lock.unlock();
}
}
public void unparkAll() {
lock.lock();
try {
var curr = waiters.head;
do {
LockSupport.unpark(curr.thread);
} while ((curr = curr.next) != null);
} finally {
lock.unlock();
}
}
private class Waiter {
Thread thread;
private Waiter prev;
private Waiter next;
private Waiter(Thread thread) {
this.thread = thread;
}
}
private class WaitList {
private Waiter head;
private Waiter tail;
public void add(Waiter node) {
Waiter tail = this.tail;
this.tail = node;
node.prev = tail;
node.next = null;
if (tail != null) {
tail.next = node;
} else {
head = node;
}
}
public void remove(Waiter node) {
Waiter prev = node.prev;
Waiter next = node.next;
if (prev != null) {
prev.next = next;
if (next != null) {
next.prev = prev;
} else {
tail = prev;
}
} else {
head = next;
if (head == null) {
tail = null;
}
}
}
public Thread peek() {
if (tail == null) return null;
return tail.thread;
}
}
}
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
class ParkerSync {
private final WaitList waiters = new WaitList();
void park(BooleanSupplier pred) {
if (pred.getAsBoolean()) return;
Thread current = Thread.currentThread();
var node = new Waiter(current);
synchronized (waiters) {
waiters.add(node);
}
boolean interrupted = false;
while (!pred.getAsBoolean()) {
LockSupport.park(this);
if (Thread.interrupted()) {
interrupted = true;
}
}
synchronized (waiters) {
waiters.remove(node);
}
if (interrupted) {
current.interrupt();
}
}
public void unparkOne() {
synchronized (waiters) {
LockSupport.unpark(waiters.peek());
}
}
public void unparkAll() {
synchronized (waiters) {
var curr = waiters.head;
do {
LockSupport.unpark(curr.thread);
} while ((curr = curr.next) != null);
}
}
private class Waiter {
Thread thread;
private Waiter prev;
private Waiter next;
private Waiter(Thread thread) {
this.thread = thread;
}
}
private class WaitList {
private Waiter head;
private Waiter tail;
public void add(Waiter node) {
Waiter tail = this.tail;
this.tail = node;
node.prev = tail;
node.next = null;
if (tail != null) {
tail.next = node;
} else {
head = node;
}
}
public void remove(Waiter node) {
Waiter prev = node.prev;
Waiter next = node.next;
if (prev != null) {
prev.next = next;
if (next != null) {
next.prev = prev;
} else {
tail = prev;
}
} else {
head = next;
if (head == null) {
tail = null;
}
}
}
public Thread peek() {
if (tail == null) return null;
return tail.thread;
}
}
}
import jdk.internal.vm.annotation.Contended;
import java.util.concurrent.atomic.AtomicLong;
/**
A modified version of
<a href="https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">Dmitry Vyukov's bounded MPMC queue</a>,
adapted for single-consumer usage:
<p>
Most notably, allowing only a single thread to receive at
the same time greatly simplifies the dequeue logic and
removes the need for some atomic operations.
<p>
Mostly adopted from
<a href="https://gist.github.com/vbe0201/2f30163415e6e99dafe3045d8d254b4f">Vale's zig implemenation</a>
@param <T> The queue type */
@SuppressWarnings({"jol", "unused"})
public class VyukovMpscAtomic<T> {
private final Slot<T>[] slots;
@Contended
private AtomicLong enqueuePos = new AtomicLong();
@Contended
private AtomicLong dequeuePos = new AtomicLong();
private final long slotsMask;
/**
@param capacity the capacity of the list, must be power of 2
@throws IllegalArgumentException if the capacity isn't a power of 2
*/
@SuppressWarnings("unchecked")
public VyukovMpscAtomic(int capacity) {
if (capacity < 2 || (capacity & (capacity - 1)) != 0) {
throw new IllegalArgumentException("capacity must be power of 2");
}
slots = new Slot[capacity];
for (int i = 0; i < slots.length; i++) {
slots[i] = new Slot<>(i);
}
slotsMask = capacity - 1;
}
/**
@return gets the queue's capacity
*/
public long length() {
return slotsMask + 1;
}
/**
@param value the value to be pushed
@return whether the operation was successful
*/
public boolean push(T value) {
var pos = enqueuePos.get();
while (true) {
var slot = slots[(int) (pos & slotsMask)];
var cmp = slot.sequence.getAcquire() - pos;
if (cmp == 0) {
if (enqueuePos.weakCompareAndSet(pos, pos + 1)) {
slot.value = value;
slot.sequence.setRelease(pos + 1);
return true;
}
pos = enqueuePos.get();
} else if (cmp < 0) {
return false;
} else {
pos = enqueuePos.get();
}
}
}
/**
@return the value or null if the queue is empty
*/
public T pop() {
var pos = dequeuePos.get();
var slot = slots[(int) (pos & slotsMask)];
pos += 1;
var stamp = slot.sequence.getAcquire();
if (stamp == pos) {
dequeuePos.set(pos);
var value = slot.value;
slot.sequence.setRelease(pos + slotsMask);
return value;
}
return null;
}
private static final class Slot<T> {
AtomicLong sequence;
T value = null;
private Slot(long sequence) {
this.sequence = new AtomicLong(sequence);
}
}
}
import jdk.internal.vm.annotation.Contended;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicLong;
/**
A modified version of
<a href="https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">Dmitry Vyukov's bounded MPMC queue</a>,
adapted for single-consumer usage:
<p>
Most notably, allowing only a single thread to receive at
the same time greatly simplifies the dequeue logic and
removes the need for some atomic operations.
<p>
Mostly adopted from
<a href="https://gist.github.com/vbe0201/2f30163415e6e99dafe3045d8d254b4f">Vale's zig implemenation</a>
@param <T> The queue type */
@SuppressWarnings({"jol", "unused"})
public class VyukovMpscNNM<T> {
private static final VarHandle ENQUEUE_POS;
private static final VarHandle DEQUEUE_POS;
static {
try {
ENQUEUE_POS = MethodHandles.lookup().findVarHandle(VyukovMpscNNM.class, "enqueuePos", long.class);
DEQUEUE_POS = MethodHandles.lookup().findVarHandle(VyukovMpscNNM.class, "dequeuePos", long.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private final Slot<T>[] slots;
@Contended
private volatile long enqueuePos = 0;
@Contended
private volatile long dequeuePos = 0;
private final long slotsMask;
/**
@param capacity the capacity of the list, must be power of 2
@throws IllegalArgumentException if the capacity isn't a power of 2
*/
@SuppressWarnings("unchecked")
public VyukovMpscNNM(int capacity) {
if (capacity < 2 || (capacity & (capacity - 1)) != 0) {
throw new IllegalArgumentException("capacity must be power of 2");
}
slots = new Slot[capacity];
for (int i = 0; i < slots.length; i++) {
slots[i] = new Slot<>(i);
}
slotsMask = capacity - 1;
}
/**
@return gets the queue's capacity
*/
public long length() {
return slotsMask + 1;
}
/**
@param value the value to be pushed
@return whether the operation was successful
*/
public boolean push(T value) {
var pos = (long) ENQUEUE_POS.get(this);
while (true) {
var slot = slots[(int) (pos & slotsMask)];
var cmp = slot.sequence.getAcquire() - pos;
if (cmp == 0) {
if (ENQUEUE_POS.weakCompareAndSet(this, pos, pos + 1)) {
slot.value = value;
slot.sequence.setRelease(pos + 1);
return true;
}
pos = (long) ENQUEUE_POS.get(this);
} else if (cmp < 0) {
return false;
} else {
pos = (long) ENQUEUE_POS.get(this);
}
}
}
/**
@return the value or null if the queue is empty
*/
public T pop() {
var pos = (long) DEQUEUE_POS.get(this);
var slot = slots[(int) (pos & slotsMask)];
pos += 1;
var stamp = slot.sequence.getAcquire();
if (stamp == pos) {
DEQUEUE_POS.set(this, pos);
var value = slot.value;
slot.sequence.setRelease(pos + slotsMask);
return value;
}
return null;
}
private static final class Slot<T> {
AtomicLong sequence;
T value = null;
private Slot(long sequence) {
this.sequence = new AtomicLong(sequence);
}
}
}
import java.lang.foreign.MemoryLayout;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.MemorySession;
import java.lang.foreign.ValueLayout;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicLong;
/**
A modified version of
<a href="https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">Dmitry Vyukov's bounded MPMC queue</a>,
adapted for single-consumer usage:
<p>
Most notably, allowing only a single thread to receive at
the same time greatly simplifies the dequeue logic and
removes the need for some atomic operations.
<p>
Mostly adopted from
<a href="https://gist.github.com/vbe0201/2f30163415e6e99dafe3045d8d254b4f">Vale's zig implemenation</a>
@param <T> The queue type */
public class VyukovMpsc<T> implements AutoCloseable {
private static final int CACHE_PAD = 1024;
private static final MemoryLayout POSITIONS_LAYOUT = MemoryLayout.structLayout(
ValueLayout.JAVA_LONG.withName("enqueuePos").withBitAlignment(CACHE_PAD),
MemoryLayout.paddingLayout(CACHE_PAD - 64),
ValueLayout.JAVA_LONG.withName("dequeuePos")
);
private static final VarHandle ENQUEUE_POS = POSITIONS_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement(
"enqueuePos"));
private static final VarHandle DEQUEUE_POS = POSITIONS_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement(
"dequeuePos"));
private final MemorySegment positions = MemorySegment.allocateNative(POSITIONS_LAYOUT, MemorySession.openShared());
private final Slot<T>[] slots;
private final long slotsMask;
/**
@param capacity the capacity of the list, must be power of 2
@throws IllegalArgumentException if the capacity isn't a power of 2
*/
@SuppressWarnings("unchecked")
public VyukovMpsc(int capacity) {
if (capacity < 2 || (capacity & (capacity - 1)) != 0) {
throw new IllegalArgumentException("capacity must be power of 2");
}
slots = new Slot[capacity];
for (int i = 0; i < slots.length; i++) {
slots[i] = new Slot<>(i);
}
ENQUEUE_POS.set(positions, 0);
DEQUEUE_POS.set(positions, 0);
slotsMask = capacity - 1;
}
/**
@return gets the queue's capacity
*/
public long length() {
return slotsMask + 1;
}
/**
@param value the value to be pushed
@return whether the operation was successful
*/
public boolean push(T value) {
var pos = (long) ENQUEUE_POS.get(positions);
while (true) {
var slot = slots[(int) (pos & slotsMask)];
var cmp = slot.sequence.getAcquire() - pos;
if (cmp == 0) {
if (ENQUEUE_POS.weakCompareAndSet(positions, pos, pos + 1)) {
slot.value = value;
slot.sequence.setRelease(pos + 1);
return true;
}
pos = (long) ENQUEUE_POS.get(positions);
} else if (cmp < 0) {
return false;
} else {
pos = (long) ENQUEUE_POS.get(positions);
}
}
}
/**
@return the value or null if the queue is empty
*/
public T pop() {
var pos = (long) DEQUEUE_POS.get(positions);
var slot = slots[(int) (pos & slotsMask)];
pos += 1;
var stamp = slot.sequence.getAcquire();
if (stamp == pos) {
DEQUEUE_POS.set(positions, pos);
var value = slot.value;
slot.sequence.setRelease(pos + slotsMask);
return value;
}
return null;
}
@Override
public void close() {
positions.session().close();
}
private static final class Slot<T> {
AtomicLong sequence;
T value = null;
private Slot(long sequence) {
this.sequence = new AtomicLong(sequence);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment