Last active
December 1, 2025 09:54
-
-
Save vietj/cd1567e78579a6c1ff8ce53f21796d17 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package io.vertx.tests.vertx; | |
| import io.vertx.test.core.AsyncTestBase; | |
| import org.junit.Test; | |
| import java.util.*; | |
| import java.util.concurrent.CountDownLatch; | |
| import java.util.concurrent.Executor; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.ThreadFactory; | |
| import java.util.concurrent.locks.Lock; | |
| import java.util.concurrent.locks.ReentrantLock; | |
| public class CustomSchedulerTest extends AsyncTestBase { | |
| public interface ExecutionContext { | |
| void suspend() throws InterruptedException; | |
| void resume(); | |
| } | |
| public interface Scheduler extends Executor { | |
| ExecutionContext executionContext(); | |
| } | |
| public static class CustomScheduler implements Thread.VirtualThreadScheduler, Scheduler { | |
| private final Executor carrier = Executors.newFixedThreadPool(1); | |
| private Thread running; | |
| private final Set<Thread> continuations = new HashSet<>(); | |
| private final Deque<Thread.VirtualThreadTask> pending = new ArrayDeque<>(); | |
| private final ThreadFactory factory; | |
| public CustomScheduler() { | |
| factory = Thread.ofVirtual().scheduler(this).factory(); | |
| } | |
| @Override | |
| public void execute(Runnable runnable) { | |
| factory.newThread(runnable).start(); | |
| } | |
| @Override | |
| public void onStart(Thread.VirtualThreadTask virtualThreadTask) { | |
| carrier.execute(() -> { | |
| handle(virtualThreadTask); | |
| }); | |
| } | |
| void handle(Thread.VirtualThreadTask virtualThreadTask) { | |
| if (continuations.isEmpty()) { | |
| run(virtualThreadTask); | |
| } else if (continuations.remove(virtualThreadTask.thread())) { | |
| run(virtualThreadTask); | |
| if (continuations.isEmpty()) { | |
| while ((virtualThreadTask = pending.poll()) != null) { | |
| run(virtualThreadTask); | |
| } | |
| } | |
| } else { | |
| pending.add(virtualThreadTask); | |
| } | |
| } | |
| void run(Thread.VirtualThreadTask virtualThreadTask) { | |
| running = virtualThreadTask.thread(); | |
| try { | |
| virtualThreadTask.run(); | |
| } finally { | |
| running = null; | |
| } | |
| } | |
| @Override | |
| public void onContinue(Thread.VirtualThreadTask virtualThreadTask) { | |
| carrier.execute(() -> { | |
| handle(virtualThreadTask); | |
| }); | |
| } | |
| public ExecutionContext executionContext() { | |
| Thread th = Thread.currentThread(); | |
| if (running != th) { | |
| throw new IllegalStateException(); | |
| } | |
| continuations.add(th); | |
| return new ExecutionContext() { | |
| CountDownLatch latch; | |
| @Override | |
| public void suspend() throws InterruptedException { | |
| latch = new CountDownLatch(1); | |
| latch.await(); | |
| } | |
| @Override | |
| public void resume() { | |
| if (latch == null) { | |
| throw new IllegalStateException(); | |
| } | |
| continuations.add(th); | |
| latch.countDown(); | |
| } | |
| }; | |
| } | |
| } | |
| @Test | |
| public void testRace() throws Exception { | |
| testRace(new CustomScheduler()); | |
| } | |
| private void testRace(Scheduler scheduler) throws Exception { | |
| List<Integer> sequence = Collections.synchronizedList(new ArrayList<>()); | |
| scheduler.execute(() -> { | |
| ExecutionContext context = scheduler.executionContext(); | |
| new Thread(() -> { | |
| scheduler.execute(() -> { | |
| sequence.add(3); | |
| }); | |
| context.resume(); | |
| }).start(); | |
| sequence.add(1); | |
| try { | |
| context.suspend(); | |
| sequence.add(2); | |
| } catch (InterruptedException ignore) { | |
| } | |
| }); | |
| long now = System.currentTimeMillis(); | |
| while (true) { | |
| assertTrue(System.currentTimeMillis() - now < 20_000); | |
| if (sequence.size() == 3) { | |
| assertEquals(List.of(1, 2, 3), sequence); | |
| break; | |
| } | |
| Thread.sleep(1); | |
| } | |
| } | |
| @Test | |
| public void testRace2() throws Exception { | |
| class SchedulerImpl implements Scheduler { | |
| private final Lock lock = new ReentrantLock(); | |
| private final ThreadFactory factory = Thread.ofVirtual().factory(); | |
| private final Deque<CountDownLatch> continuations = new ArrayDeque<>(); | |
| @Override | |
| public ExecutionContext executionContext() { | |
| return new ExecutionContext() { | |
| CountDownLatch latch; | |
| @Override | |
| public void suspend() throws InterruptedException { | |
| CountDownLatch l = new CountDownLatch(1); | |
| latch = l; | |
| lock.unlock(); | |
| latch.await(); | |
| lock.lock(); | |
| continuations.remove(l); | |
| } | |
| @Override | |
| public void resume() { | |
| CountDownLatch l = latch; | |
| if (l == null) { | |
| throw new IllegalStateException(); | |
| } | |
| latch = null; | |
| continuations.add(l); | |
| l.countDown(); | |
| } | |
| }; | |
| } | |
| @Override | |
| public void execute(Runnable runnable) { | |
| factory.newThread(() -> { | |
| while (true) { | |
| lock.lock(); | |
| if (continuations.isEmpty()) { | |
| try { | |
| runnable.run(); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| break; | |
| } else { | |
| lock.unlock(); | |
| Thread.yield(); | |
| } | |
| } | |
| }).start(); | |
| } | |
| } | |
| testRace(new SchedulerImpl()); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment