Created
February 8, 2023 14:42
-
-
Save Goldmensch/69bd2bfc43ae28e69851f7811e4254ad to your computer and use it in GitHub Desktop.
Java implementation of Vyukov Mpsc using java 19 preview
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.locks.Lock; | |
| import java.util.concurrent.locks.LockSupport; | |
| import java.util.concurrent.locks.ReentrantLock; | |
| import java.util.function.BooleanSupplier; | |
| class Parker { | |
| private final Lock lock = new ReentrantLock(); | |
| private final WaitList waiters = new WaitList(); | |
| void park(BooleanSupplier pred) { | |
| if (pred.getAsBoolean()) return; | |
| Thread current = Thread.currentThread(); | |
| for (int i = 0; i < 6; i++) { | |
| if (pred.getAsBoolean()) return; | |
| for (int j = 0; j < (i*i); j++) Thread.onSpinWait(); | |
| } | |
| var node = new Waiter(current); | |
| lock.lock(); | |
| try { | |
| waiters.add(node); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| boolean interrupted = false; | |
| while (!pred.getAsBoolean()) { | |
| LockSupport.park(this); | |
| if (Thread.interrupted()) { | |
| interrupted = true; | |
| } | |
| } | |
| lock.lock(); | |
| try { | |
| waiters.remove(node); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| if (interrupted) { | |
| current.interrupt(); | |
| } | |
| } | |
| public void unparkOne() { | |
| lock.lock(); | |
| try { | |
| LockSupport.unpark(waiters.peek()); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| public void unparkAll() { | |
| lock.lock(); | |
| try { | |
| var curr = waiters.head; | |
| do { | |
| LockSupport.unpark(curr.thread); | |
| } while ((curr = curr.next) != null); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| private class Waiter { | |
| Thread thread; | |
| private Waiter prev; | |
| private Waiter next; | |
| private Waiter(Thread thread) { | |
| this.thread = thread; | |
| } | |
| } | |
| private class WaitList { | |
| private Waiter head; | |
| private Waiter tail; | |
| public void add(Waiter node) { | |
| Waiter tail = this.tail; | |
| this.tail = node; | |
| node.prev = tail; | |
| node.next = null; | |
| if (tail != null) { | |
| tail.next = node; | |
| } else { | |
| head = node; | |
| } | |
| } | |
| public void remove(Waiter node) { | |
| Waiter prev = node.prev; | |
| Waiter next = node.next; | |
| if (prev != null) { | |
| prev.next = next; | |
| if (next != null) { | |
| next.prev = prev; | |
| } else { | |
| tail = prev; | |
| } | |
| } else { | |
| head = next; | |
| if (head == null) { | |
| tail = null; | |
| } | |
| } | |
| } | |
| public Thread peek() { | |
| if (tail == null) return null; | |
| return tail.thread; | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.locks.LockSupport; | |
| import java.util.function.BooleanSupplier; | |
| class ParkerSync { | |
| private final WaitList waiters = new WaitList(); | |
| void park(BooleanSupplier pred) { | |
| if (pred.getAsBoolean()) return; | |
| Thread current = Thread.currentThread(); | |
| var node = new Waiter(current); | |
| synchronized (waiters) { | |
| waiters.add(node); | |
| } | |
| boolean interrupted = false; | |
| while (!pred.getAsBoolean()) { | |
| LockSupport.park(this); | |
| if (Thread.interrupted()) { | |
| interrupted = true; | |
| } | |
| } | |
| synchronized (waiters) { | |
| waiters.remove(node); | |
| } | |
| if (interrupted) { | |
| current.interrupt(); | |
| } | |
| } | |
| public void unparkOne() { | |
| synchronized (waiters) { | |
| LockSupport.unpark(waiters.peek()); | |
| } | |
| } | |
| public void unparkAll() { | |
| synchronized (waiters) { | |
| var curr = waiters.head; | |
| do { | |
| LockSupport.unpark(curr.thread); | |
| } while ((curr = curr.next) != null); | |
| } | |
| } | |
| private class Waiter { | |
| Thread thread; | |
| private Waiter prev; | |
| private Waiter next; | |
| private Waiter(Thread thread) { | |
| this.thread = thread; | |
| } | |
| } | |
| private class WaitList { | |
| private Waiter head; | |
| private Waiter tail; | |
| public void add(Waiter node) { | |
| Waiter tail = this.tail; | |
| this.tail = node; | |
| node.prev = tail; | |
| node.next = null; | |
| if (tail != null) { | |
| tail.next = node; | |
| } else { | |
| head = node; | |
| } | |
| } | |
| public void remove(Waiter node) { | |
| Waiter prev = node.prev; | |
| Waiter next = node.next; | |
| if (prev != null) { | |
| prev.next = next; | |
| if (next != null) { | |
| next.prev = prev; | |
| } else { | |
| tail = prev; | |
| } | |
| } else { | |
| head = next; | |
| if (head == null) { | |
| tail = null; | |
| } | |
| } | |
| } | |
| public Thread peek() { | |
| if (tail == null) return null; | |
| return tail.thread; | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import jdk.internal.vm.annotation.Contended; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| /** | |
| A modified version of | |
| <a href="https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">Dmitry Vyukov's bounded MPMC queue</a>, | |
| adapted for single-consumer usage: | |
| <p> | |
| Most notably, allowing only a single thread to receive at | |
| the same time greatly simplifies the dequeue logic and | |
| removes the need for some atomic operations. | |
| <p> | |
| Mostly adopted from | |
| <a href="https://gist.github.com/vbe0201/2f30163415e6e99dafe3045d8d254b4f">Vale's zig implemenation</a> | |
| @param <T> The queue type */ | |
| @SuppressWarnings({"jol", "unused"}) | |
| public class VyukovMpscAtomic<T> { | |
| private final Slot<T>[] slots; | |
| @Contended | |
| private AtomicLong enqueuePos = new AtomicLong(); | |
| @Contended | |
| private AtomicLong dequeuePos = new AtomicLong(); | |
| private final long slotsMask; | |
| /** | |
| @param capacity the capacity of the list, must be power of 2 | |
| @throws IllegalArgumentException if the capacity isn't a power of 2 | |
| */ | |
| @SuppressWarnings("unchecked") | |
| public VyukovMpscAtomic(int capacity) { | |
| if (capacity < 2 || (capacity & (capacity - 1)) != 0) { | |
| throw new IllegalArgumentException("capacity must be power of 2"); | |
| } | |
| slots = new Slot[capacity]; | |
| for (int i = 0; i < slots.length; i++) { | |
| slots[i] = new Slot<>(i); | |
| } | |
| slotsMask = capacity - 1; | |
| } | |
| /** | |
| @return gets the queue's capacity | |
| */ | |
| public long length() { | |
| return slotsMask + 1; | |
| } | |
| /** | |
| @param value the value to be pushed | |
| @return whether the operation was successful | |
| */ | |
| public boolean push(T value) { | |
| var pos = enqueuePos.get(); | |
| while (true) { | |
| var slot = slots[(int) (pos & slotsMask)]; | |
| var cmp = slot.sequence.getAcquire() - pos; | |
| if (cmp == 0) { | |
| if (enqueuePos.weakCompareAndSet(pos, pos + 1)) { | |
| slot.value = value; | |
| slot.sequence.setRelease(pos + 1); | |
| return true; | |
| } | |
| pos = enqueuePos.get(); | |
| } else if (cmp < 0) { | |
| return false; | |
| } else { | |
| pos = enqueuePos.get(); | |
| } | |
| } | |
| } | |
| /** | |
| @return the value or null if the queue is empty | |
| */ | |
| public T pop() { | |
| var pos = dequeuePos.get(); | |
| var slot = slots[(int) (pos & slotsMask)]; | |
| pos += 1; | |
| var stamp = slot.sequence.getAcquire(); | |
| if (stamp == pos) { | |
| dequeuePos.set(pos); | |
| var value = slot.value; | |
| slot.sequence.setRelease(pos + slotsMask); | |
| return value; | |
| } | |
| return null; | |
| } | |
| private static final class Slot<T> { | |
| AtomicLong sequence; | |
| T value = null; | |
| private Slot(long sequence) { | |
| this.sequence = new AtomicLong(sequence); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import jdk.internal.vm.annotation.Contended; | |
| import java.lang.invoke.MethodHandles; | |
| import java.lang.invoke.VarHandle; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| /** | |
| A modified version of | |
| <a href="https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">Dmitry Vyukov's bounded MPMC queue</a>, | |
| adapted for single-consumer usage: | |
| <p> | |
| Most notably, allowing only a single thread to receive at | |
| the same time greatly simplifies the dequeue logic and | |
| removes the need for some atomic operations. | |
| <p> | |
| Mostly adopted from | |
| <a href="https://gist.github.com/vbe0201/2f30163415e6e99dafe3045d8d254b4f">Vale's zig implemenation</a> | |
| @param <T> The queue type */ | |
| @SuppressWarnings({"jol", "unused"}) | |
| public class VyukovMpscNNM<T> { | |
| private static final VarHandle ENQUEUE_POS; | |
| private static final VarHandle DEQUEUE_POS; | |
| static { | |
| try { | |
| ENQUEUE_POS = MethodHandles.lookup().findVarHandle(VyukovMpscNNM.class, "enqueuePos", long.class); | |
| DEQUEUE_POS = MethodHandles.lookup().findVarHandle(VyukovMpscNNM.class, "dequeuePos", long.class); | |
| } catch (NoSuchFieldException | IllegalAccessException e) { | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| private final Slot<T>[] slots; | |
| @Contended | |
| private volatile long enqueuePos = 0; | |
| @Contended | |
| private volatile long dequeuePos = 0; | |
| private final long slotsMask; | |
| /** | |
| @param capacity the capacity of the list, must be power of 2 | |
| @throws IllegalArgumentException if the capacity isn't a power of 2 | |
| */ | |
| @SuppressWarnings("unchecked") | |
| public VyukovMpscNNM(int capacity) { | |
| if (capacity < 2 || (capacity & (capacity - 1)) != 0) { | |
| throw new IllegalArgumentException("capacity must be power of 2"); | |
| } | |
| slots = new Slot[capacity]; | |
| for (int i = 0; i < slots.length; i++) { | |
| slots[i] = new Slot<>(i); | |
| } | |
| slotsMask = capacity - 1; | |
| } | |
| /** | |
| @return gets the queue's capacity | |
| */ | |
| public long length() { | |
| return slotsMask + 1; | |
| } | |
| /** | |
| @param value the value to be pushed | |
| @return whether the operation was successful | |
| */ | |
| public boolean push(T value) { | |
| var pos = (long) ENQUEUE_POS.get(this); | |
| while (true) { | |
| var slot = slots[(int) (pos & slotsMask)]; | |
| var cmp = slot.sequence.getAcquire() - pos; | |
| if (cmp == 0) { | |
| if (ENQUEUE_POS.weakCompareAndSet(this, pos, pos + 1)) { | |
| slot.value = value; | |
| slot.sequence.setRelease(pos + 1); | |
| return true; | |
| } | |
| pos = (long) ENQUEUE_POS.get(this); | |
| } else if (cmp < 0) { | |
| return false; | |
| } else { | |
| pos = (long) ENQUEUE_POS.get(this); | |
| } | |
| } | |
| } | |
| /** | |
| @return the value or null if the queue is empty | |
| */ | |
| public T pop() { | |
| var pos = (long) DEQUEUE_POS.get(this); | |
| var slot = slots[(int) (pos & slotsMask)]; | |
| pos += 1; | |
| var stamp = slot.sequence.getAcquire(); | |
| if (stamp == pos) { | |
| DEQUEUE_POS.set(this, pos); | |
| var value = slot.value; | |
| slot.sequence.setRelease(pos + slotsMask); | |
| return value; | |
| } | |
| return null; | |
| } | |
| private static final class Slot<T> { | |
| AtomicLong sequence; | |
| T value = null; | |
| private Slot(long sequence) { | |
| this.sequence = new AtomicLong(sequence); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.lang.foreign.MemoryLayout; | |
| import java.lang.foreign.MemorySegment; | |
| import java.lang.foreign.MemorySession; | |
| import java.lang.foreign.ValueLayout; | |
| import java.lang.invoke.VarHandle; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| /** | |
| A modified version of | |
| <a href="https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">Dmitry Vyukov's bounded MPMC queue</a>, | |
| adapted for single-consumer usage: | |
| <p> | |
| Most notably, allowing only a single thread to receive at | |
| the same time greatly simplifies the dequeue logic and | |
| removes the need for some atomic operations. | |
| <p> | |
| Mostly adopted from | |
| <a href="https://gist.github.com/vbe0201/2f30163415e6e99dafe3045d8d254b4f">Vale's zig implemenation</a> | |
| @param <T> The queue type */ | |
| public class VyukovMpsc<T> implements AutoCloseable { | |
| private static final int CACHE_PAD = 1024; | |
| private static final MemoryLayout POSITIONS_LAYOUT = MemoryLayout.structLayout( | |
| ValueLayout.JAVA_LONG.withName("enqueuePos").withBitAlignment(CACHE_PAD), | |
| MemoryLayout.paddingLayout(CACHE_PAD - 64), | |
| ValueLayout.JAVA_LONG.withName("dequeuePos") | |
| ); | |
| private static final VarHandle ENQUEUE_POS = POSITIONS_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement( | |
| "enqueuePos")); | |
| private static final VarHandle DEQUEUE_POS = POSITIONS_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement( | |
| "dequeuePos")); | |
| private final MemorySegment positions = MemorySegment.allocateNative(POSITIONS_LAYOUT, MemorySession.openShared()); | |
| private final Slot<T>[] slots; | |
| private final long slotsMask; | |
| /** | |
| @param capacity the capacity of the list, must be power of 2 | |
| @throws IllegalArgumentException if the capacity isn't a power of 2 | |
| */ | |
| @SuppressWarnings("unchecked") | |
| public VyukovMpsc(int capacity) { | |
| if (capacity < 2 || (capacity & (capacity - 1)) != 0) { | |
| throw new IllegalArgumentException("capacity must be power of 2"); | |
| } | |
| slots = new Slot[capacity]; | |
| for (int i = 0; i < slots.length; i++) { | |
| slots[i] = new Slot<>(i); | |
| } | |
| ENQUEUE_POS.set(positions, 0); | |
| DEQUEUE_POS.set(positions, 0); | |
| slotsMask = capacity - 1; | |
| } | |
| /** | |
| @return gets the queue's capacity | |
| */ | |
| public long length() { | |
| return slotsMask + 1; | |
| } | |
| /** | |
| @param value the value to be pushed | |
| @return whether the operation was successful | |
| */ | |
| public boolean push(T value) { | |
| var pos = (long) ENQUEUE_POS.get(positions); | |
| while (true) { | |
| var slot = slots[(int) (pos & slotsMask)]; | |
| var cmp = slot.sequence.getAcquire() - pos; | |
| if (cmp == 0) { | |
| if (ENQUEUE_POS.weakCompareAndSet(positions, pos, pos + 1)) { | |
| slot.value = value; | |
| slot.sequence.setRelease(pos + 1); | |
| return true; | |
| } | |
| pos = (long) ENQUEUE_POS.get(positions); | |
| } else if (cmp < 0) { | |
| return false; | |
| } else { | |
| pos = (long) ENQUEUE_POS.get(positions); | |
| } | |
| } | |
| } | |
| /** | |
| @return the value or null if the queue is empty | |
| */ | |
| public T pop() { | |
| var pos = (long) DEQUEUE_POS.get(positions); | |
| var slot = slots[(int) (pos & slotsMask)]; | |
| pos += 1; | |
| var stamp = slot.sequence.getAcquire(); | |
| if (stamp == pos) { | |
| DEQUEUE_POS.set(positions, pos); | |
| var value = slot.value; | |
| slot.sequence.setRelease(pos + slotsMask); | |
| return value; | |
| } | |
| return null; | |
| } | |
| @Override | |
| public void close() { | |
| positions.session().close(); | |
| } | |
| private static final class Slot<T> { | |
| AtomicLong sequence; | |
| T value = null; | |
| private Slot(long sequence) { | |
| this.sequence = new AtomicLong(sequence); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment