Skip to content

Instantly share code, notes, and snippets.

@Ramblurr
Created January 16, 2026 09:18
Show Gist options
  • Select an option

  • Save Ramblurr/577be54e2e3ad3096a86807acd8c83bc to your computer and use it in GitHub Desktop.

Select an option

Save Ramblurr/577be54e2e3ad3096a86807acd8c83bc to your computer and use it in GitHub Desktop.
a blocking fan-in selector over multiple BlockingQueues that leaks memory
(ns queue
(:import [java.util.concurrent LinkedBlockingQueue]))
(defn make-selector
"Creates a blocking fan-in selector over multiple BlockingQueues."
[qs]
(let [;; unbounded => forwarders never block on put
out (LinkedBlockingQueue.)]
(doseq [q qs]
;; warning: these never exit causing mem leaks
(Thread/startVirtualThread
(fn []
(loop []
(.put out (.take ^LinkedBlockingQueue q))
(recur)))))
(fn []
(.take out))))
(def q1 (LinkedBlockingQueue. 10))
(def q2 (LinkedBlockingQueue. 10))
(Thread/startVirtualThread
(fn []
(loop []
(.offer q1 :foo)
(Thread/sleep 500)
(recur))))
(Thread/startVirtualThread
(fn []
(loop []
(.offer q2 :bar)
(Thread/sleep 500)
(recur))))
(Thread/startVirtualThread
(fn []
(Thread/sleep 5000)
(.offer q1 :done)))
;; prints :foo and :bar for 5 seconds then prints done and exits
(let [select (make-selector [q1 q2])]
(loop []
(let [item (select)]
(if (= item :done)
(println "done")
(do (println "got" item)
(recur))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment