Skip to content

Instantly share code, notes, and snippets.

@vietj
Last active December 1, 2025 09:54
Show Gist options
  • Select an option

  • Save vietj/cd1567e78579a6c1ff8ce53f21796d17 to your computer and use it in GitHub Desktop.

Select an option

Save vietj/cd1567e78579a6c1ff8ce53f21796d17 to your computer and use it in GitHub Desktop.
package io.vertx.tests.vertx;
import io.vertx.test.core.AsyncTestBase;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CustomSchedulerTest extends AsyncTestBase {
public interface ExecutionContext {
void suspend() throws InterruptedException;
void resume();
}
public interface Scheduler extends Executor {
ExecutionContext executionContext();
}
public static class CustomScheduler implements Thread.VirtualThreadScheduler, Scheduler {
private final Executor carrier = Executors.newFixedThreadPool(1);
private Thread running;
private final Set<Thread> continuations = new HashSet<>();
private final Deque<Thread.VirtualThreadTask> pending = new ArrayDeque<>();
private final ThreadFactory factory;
public CustomScheduler() {
factory = Thread.ofVirtual().scheduler(this).factory();
}
@Override
public void execute(Runnable runnable) {
factory.newThread(runnable).start();
}
@Override
public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
carrier.execute(() -> {
handle(virtualThreadTask);
});
}
void handle(Thread.VirtualThreadTask virtualThreadTask) {
if (continuations.isEmpty()) {
run(virtualThreadTask);
} else if (continuations.remove(virtualThreadTask.thread())) {
run(virtualThreadTask);
if (continuations.isEmpty()) {
while ((virtualThreadTask = pending.poll()) != null) {
run(virtualThreadTask);
}
}
} else {
pending.add(virtualThreadTask);
}
}
void run(Thread.VirtualThreadTask virtualThreadTask) {
running = virtualThreadTask.thread();
try {
virtualThreadTask.run();
} finally {
running = null;
}
}
@Override
public void onContinue(Thread.VirtualThreadTask virtualThreadTask) {
carrier.execute(() -> {
handle(virtualThreadTask);
});
}
public ExecutionContext executionContext() {
Thread th = Thread.currentThread();
if (running != th) {
throw new IllegalStateException();
}
continuations.add(th);
return new ExecutionContext() {
CountDownLatch latch;
@Override
public void suspend() throws InterruptedException {
latch = new CountDownLatch(1);
latch.await();
}
@Override
public void resume() {
if (latch == null) {
throw new IllegalStateException();
}
continuations.add(th);
latch.countDown();
}
};
}
}
@Test
public void testRace() throws Exception {
testRace(new CustomScheduler());
}
private void testRace(Scheduler scheduler) throws Exception {
List<Integer> sequence = Collections.synchronizedList(new ArrayList<>());
scheduler.execute(() -> {
ExecutionContext context = scheduler.executionContext();
new Thread(() -> {
scheduler.execute(() -> {
sequence.add(3);
});
context.resume();
}).start();
sequence.add(1);
try {
context.suspend();
sequence.add(2);
} catch (InterruptedException ignore) {
}
});
long now = System.currentTimeMillis();
while (true) {
assertTrue(System.currentTimeMillis() - now < 20_000);
if (sequence.size() == 3) {
assertEquals(List.of(1, 2, 3), sequence);
break;
}
Thread.sleep(1);
}
}
@Test
public void testRace2() throws Exception {
class SchedulerImpl implements Scheduler {
private final Lock lock = new ReentrantLock();
private final ThreadFactory factory = Thread.ofVirtual().factory();
private final Deque<CountDownLatch> continuations = new ArrayDeque<>();
@Override
public ExecutionContext executionContext() {
return new ExecutionContext() {
CountDownLatch latch;
@Override
public void suspend() throws InterruptedException {
CountDownLatch l = new CountDownLatch(1);
latch = l;
lock.unlock();
latch.await();
lock.lock();
continuations.remove(l);
}
@Override
public void resume() {
CountDownLatch l = latch;
if (l == null) {
throw new IllegalStateException();
}
latch = null;
continuations.add(l);
l.countDown();
}
};
}
@Override
public void execute(Runnable runnable) {
factory.newThread(() -> {
while (true) {
lock.lock();
if (continuations.isEmpty()) {
try {
runnable.run();
} finally {
lock.unlock();
}
break;
} else {
lock.unlock();
Thread.yield();
}
}
}).start();
}
}
testRace(new SchedulerImpl());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment