Skip to content

Instantly share code, notes, and snippets.

@giuseppe998e
Created January 2, 2026 09:53
Show Gist options
  • Select an option

  • Save giuseppe998e/47489a0bdd5e0a30f23b34dd766ea629 to your computer and use it in GitHub Desktop.

Select an option

Save giuseppe998e/47489a0bdd5e0a30f23b34dd766ea629 to your computer and use it in GitHub Desktop.
Threaded processor that exposes async Sink/Stream interfaces for offloading blocking work
use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
thread,
};
use futures::{Sink, SinkExt as _, Stream};
use tokio::sync::mpsc;
use tokio_util::sync::{PollSendError, PollSender};
/// Defines a blocking processor that can be driven from a dedicated worker thread.
pub trait Processor: Send + 'static {
type Input: Send;
type Output: Send;
/// Processes a single input value on the worker thread and yields its output.
fn process(&mut self, input: Self::Input) -> Self::Output;
}
/// Couples an asynchronous Sink/Stream interface with a Processor running on its own thread.
pub struct ThreadedWorker<P: Processor> {
tx: PollSender<P::Input>,
rx: mpsc::Receiver<P::Output>,
guard: WorkerGuard,
}
impl<P: Processor> ThreadedWorker<P> {
/// Spawns a dedicated thread that drives `processor` via bounded channels.
/// The `buffer` argument defines the capacity of both input and output queues.
pub fn spawn(mut processor: P, buffer: usize) -> Self {
let (tx, mut worker_rx) = mpsc::channel::<P::Input>(buffer);
let (worker_tx, rx) = mpsc::channel::<P::Output>(buffer);
let handle = thread::spawn(move || {
while let Some(input) = worker_rx.blocking_recv() {
let output = processor.process(input);
if worker_tx.blocking_send(output).is_err() {
break;
}
}
});
let tx = PollSender::new(tx);
let guard = WorkerGuard::new(handle);
Self { tx, rx, guard }
}
/// Splits the worker into independent Sink and Stream halves for sending inputs and receiving outputs.
pub fn split(self) -> (ThreadedWorkerSink<P>, ThreadedWorkerStream<P>) {
let ThreadedWorker { tx, rx, guard } = self;
let sink = ThreadedWorkerSink {
tx,
guard: guard.clone(),
};
let stream = ThreadedWorkerStream { rx, guard };
(sink, stream)
}
}
impl<P: Processor> Sink<P::Input> for ThreadedWorker<P> {
type Error = PollSendError<P::Input>;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready_unpin(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: P::Input) -> Result<(), Self::Error> {
self.tx.start_send_unpin(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_flush_unpin(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_close_unpin(cx)
}
}
impl<P: Processor> Stream for ThreadedWorker<P> {
type Item = P::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_recv(cx)
}
}
/// Sink half of a threaded worker that sends inputs to the processor.
pub struct ThreadedWorkerSink<P: Processor> {
tx: PollSender<P::Input>,
guard: WorkerGuard,
}
impl<P: Processor> Sink<P::Input> for ThreadedWorkerSink<P> {
type Error = PollSendError<P::Input>;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready_unpin(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: P::Input) -> Result<(), Self::Error> {
self.tx.start_send_unpin(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_flush_unpin(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_close_unpin(cx)
}
}
/// Stream half of a threaded worker that yields outputs produced by the processor.
pub struct ThreadedWorkerStream<P: Processor> {
rx: mpsc::Receiver<P::Output>,
guard: WorkerGuard,
}
impl<P: Processor> Stream for ThreadedWorkerStream<P> {
type Item = P::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_recv(cx)
}
}
/// Shared guard that joins the worker thread when the last handle is dropped.
#[derive(Clone)]
struct WorkerGuard {
handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
}
impl WorkerGuard {
fn new(handle: thread::JoinHandle<()>) -> Self {
Self {
handle: Arc::new(Mutex::new(Some(handle))),
}
}
}
impl Drop for WorkerGuard {
fn drop(&mut self) {
if let Ok(mut guard) = self.handle.lock()
&& let Some(handle) = guard.take()
{
let _ = handle.join();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment