Created
November 4, 2025 03:13
-
-
Save godmar/543e19cd3408112d83638dc2674d20b3 to your computer and use it in GitHub Desktop.
This demo shows how to leverage virtual threads to create deterministic coroutines.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // @author godmar@gmail.com | |
| /* Code demonstration of how to build deterministic coroutines on top of | |
| * virtual threads. | |
| * | |
| * Run with | |
| * java -Djdk.virtualThreadScheduler.parallelism=1 -Djdk.virtualThreadScheduler.maxPoolSize=1 CommandDemo | |
| */ | |
| import java.util.*; | |
| import java.util.concurrent.locks.*; | |
| /* Begin user code ***********/ | |
| class Command_One extends CommandDemo.Command { | |
| Command_One(String name) { super(name); } | |
| @Override | |
| public void run(CommandDemo.Scheduler sched) { | |
| for (int i = 0; i < 5; i++) { | |
| System.out.println("I am " + name + ": " + i); | |
| sched.yield(); | |
| } | |
| } | |
| } | |
| /* End user code ***********/ | |
| public class CommandDemo | |
| { | |
| @FunctionalInterface | |
| interface ScopedLock extends AutoCloseable { | |
| @Override | |
| void close(); // no 'throws Exception' | |
| } | |
| static ScopedLock withLock(ReentrantLock lock) { | |
| lock.lock(); | |
| return lock::unlock; | |
| } | |
| static class Scheduler { | |
| private Deque<Command> readyQueue = new ArrayDeque<>(); | |
| private Command current; | |
| private ThreadLocal<Command> vthread2command = new ThreadLocal<>(); | |
| private int numCommands; | |
| private ReentrantLock schedLock = new ReentrantLock(); | |
| private Condition schedCond = schedLock.newCondition(); | |
| enum SchedReason { WAIT_FOR_END, DIE, YIELD_OR_BLOCK }; | |
| void schedule(Command cmd) { | |
| cmd.sched = this; | |
| numCommands++; | |
| makeReady(cmd); | |
| Thread.ofVirtual().start(() -> { | |
| vthread2command.set(cmd); | |
| cmd.waitUntil(State.RUNNING); | |
| cmd.run(); | |
| runNextCommand(SchedReason.DIE); | |
| }); | |
| } | |
| void yield() { | |
| makeReady(vthread2command.get()); | |
| runNextCommand(SchedReason.YIELD_OR_BLOCK); | |
| } | |
| void runNextCommand(SchedReason reason) { | |
| var prev = vthread2command.get(); | |
| current = null; | |
| if (!readyQueue.isEmpty()) { | |
| current = readyQueue.poll(); | |
| if (prev == current) | |
| return; | |
| current.changeStateTo(State.RUNNING); | |
| } | |
| switch (reason) { | |
| case SchedReason.DIE: /* command exit */ | |
| prev.changeStateTo(State.DEAD); | |
| try (var ignored = withLock(schedLock)) { | |
| numCommands--; | |
| schedCond.signal(); | |
| } | |
| break; | |
| case SchedReason.WAIT_FOR_END: /* main thread */ | |
| try (var ignored = withLock(schedLock)) { | |
| try { | |
| while (numCommands > 0) | |
| schedCond.await(); | |
| } catch (InterruptedException _ie) { } | |
| } | |
| break; | |
| case SchedReason.YIELD_OR_BLOCK: /* yielding, wait for reschedule */ | |
| prev.waitUntil(State.RUNNING); | |
| break; | |
| } | |
| } | |
| void makeReady(Command cmd) { | |
| readyQueue.offer(cmd); | |
| cmd.changeStateTo(State.READY); | |
| } | |
| void runCommands() { | |
| runNextCommand(SchedReason.WAIT_FOR_END); | |
| } | |
| } | |
| static enum State { | |
| DEAD, READY, RUNNING, /* not yet implemented */ BLOCKED; | |
| } | |
| static abstract class Command implements Runnable { | |
| private State state; | |
| private ReentrantLock cmdLock = new ReentrantLock(); | |
| private Condition cmdCond = cmdLock.newCondition(); | |
| protected String name; | |
| protected Scheduler sched; | |
| Command(String name) { | |
| this.name = name; | |
| } | |
| @Override | |
| public String toString() { | |
| return "Command[" + name + "]"; | |
| } | |
| abstract public void run(Scheduler sched); | |
| void waitUntil(State state) { | |
| try (var ignored = withLock(cmdLock)) { | |
| while (state != this.state) { | |
| try { | |
| cmdCond.await(); | |
| } catch (InterruptedException _ie) { } | |
| } | |
| } | |
| } | |
| void changeStateTo(State state) { | |
| try (var ignored = withLock(cmdLock)) { | |
| this.state = state; | |
| cmdCond.signal(); | |
| } | |
| } | |
| @Override | |
| public void run() { | |
| run(sched); | |
| } | |
| } | |
| public static void main(String []av) { | |
| var sched = new Scheduler(); | |
| for (int i = 0; i < 4; i++) | |
| sched.schedule(new Command_One("Command #" + i)); | |
| sched.runCommands(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment