Created
January 2, 2026 09:53
-
-
Save giuseppe998e/47489a0bdd5e0a30f23b34dd766ea629 to your computer and use it in GitHub Desktop.
Threaded processor that exposes async Sink/Stream interfaces for offloading blocking work
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::{ | |
| pin::Pin, | |
| sync::{Arc, Mutex}, | |
| task::{Context, Poll}, | |
| thread, | |
| }; | |
| use futures::{Sink, SinkExt as _, Stream}; | |
| use tokio::sync::mpsc; | |
| use tokio_util::sync::{PollSendError, PollSender}; | |
| /// Defines a blocking processor that can be driven from a dedicated worker thread. | |
| pub trait Processor: Send + 'static { | |
| type Input: Send; | |
| type Output: Send; | |
| /// Processes a single input value on the worker thread and yields its output. | |
| fn process(&mut self, input: Self::Input) -> Self::Output; | |
| } | |
| /// Couples an asynchronous Sink/Stream interface with a Processor running on its own thread. | |
| pub struct ThreadedWorker<P: Processor> { | |
| tx: PollSender<P::Input>, | |
| rx: mpsc::Receiver<P::Output>, | |
| guard: WorkerGuard, | |
| } | |
| impl<P: Processor> ThreadedWorker<P> { | |
| /// Spawns a dedicated thread that drives `processor` via bounded channels. | |
| /// The `buffer` argument defines the capacity of both input and output queues. | |
| pub fn spawn(mut processor: P, buffer: usize) -> Self { | |
| let (tx, mut worker_rx) = mpsc::channel::<P::Input>(buffer); | |
| let (worker_tx, rx) = mpsc::channel::<P::Output>(buffer); | |
| let handle = thread::spawn(move || { | |
| while let Some(input) = worker_rx.blocking_recv() { | |
| let output = processor.process(input); | |
| if worker_tx.blocking_send(output).is_err() { | |
| break; | |
| } | |
| } | |
| }); | |
| let tx = PollSender::new(tx); | |
| let guard = WorkerGuard::new(handle); | |
| Self { tx, rx, guard } | |
| } | |
| /// Splits the worker into independent Sink and Stream halves for sending inputs and receiving outputs. | |
| pub fn split(self) -> (ThreadedWorkerSink<P>, ThreadedWorkerStream<P>) { | |
| let ThreadedWorker { tx, rx, guard } = self; | |
| let sink = ThreadedWorkerSink { | |
| tx, | |
| guard: guard.clone(), | |
| }; | |
| let stream = ThreadedWorkerStream { rx, guard }; | |
| (sink, stream) | |
| } | |
| } | |
| impl<P: Processor> Sink<P::Input> for ThreadedWorker<P> { | |
| type Error = PollSendError<P::Input>; | |
| fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| self.tx.poll_ready_unpin(cx) | |
| } | |
| fn start_send(mut self: Pin<&mut Self>, item: P::Input) -> Result<(), Self::Error> { | |
| self.tx.start_send_unpin(item) | |
| } | |
| fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| self.tx.poll_flush_unpin(cx) | |
| } | |
| fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| self.tx.poll_close_unpin(cx) | |
| } | |
| } | |
| impl<P: Processor> Stream for ThreadedWorker<P> { | |
| type Item = P::Output; | |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
| Pin::new(&mut self.rx).poll_recv(cx) | |
| } | |
| } | |
| /// Sink half of a threaded worker that sends inputs to the processor. | |
| pub struct ThreadedWorkerSink<P: Processor> { | |
| tx: PollSender<P::Input>, | |
| guard: WorkerGuard, | |
| } | |
| impl<P: Processor> Sink<P::Input> for ThreadedWorkerSink<P> { | |
| type Error = PollSendError<P::Input>; | |
| fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| self.tx.poll_ready_unpin(cx) | |
| } | |
| fn start_send(mut self: Pin<&mut Self>, item: P::Input) -> Result<(), Self::Error> { | |
| self.tx.start_send_unpin(item) | |
| } | |
| fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| self.tx.poll_flush_unpin(cx) | |
| } | |
| fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| self.tx.poll_close_unpin(cx) | |
| } | |
| } | |
| /// Stream half of a threaded worker that yields outputs produced by the processor. | |
| pub struct ThreadedWorkerStream<P: Processor> { | |
| rx: mpsc::Receiver<P::Output>, | |
| guard: WorkerGuard, | |
| } | |
| impl<P: Processor> Stream for ThreadedWorkerStream<P> { | |
| type Item = P::Output; | |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
| Pin::new(&mut self.rx).poll_recv(cx) | |
| } | |
| } | |
| /// Shared guard that joins the worker thread when the last handle is dropped. | |
| #[derive(Clone)] | |
| struct WorkerGuard { | |
| handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>, | |
| } | |
| impl WorkerGuard { | |
| fn new(handle: thread::JoinHandle<()>) -> Self { | |
| Self { | |
| handle: Arc::new(Mutex::new(Some(handle))), | |
| } | |
| } | |
| } | |
| impl Drop for WorkerGuard { | |
| fn drop(&mut self) { | |
| if let Ok(mut guard) = self.handle.lock() | |
| && let Some(handle) = guard.take() | |
| { | |
| let _ = handle.join(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment