Created
November 19, 2024 15:34
-
-
Save MarinPostma/fd9799751cbc69c4bcd862f80bed6f0d to your computer and use it in GitHub Desktop.
bufferpool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //! A lock-free buffer pool implementation for managing fixed-size page buffers. | |
| //! | |
| //! This buffer pool provides efficient allocation and deallocation of fixed-size memory pages | |
| //! using a bitmap-based allocation strategy. Key features include: | |
| //! | |
| //! - Lock-free concurrent access using atomic operations | |
| //! - Fixed-size page allocation | |
| //! - Efficient bitmap-based tracking of free/used pages | |
| //! - Three-level hierarchy for managing pages: | |
| //! - Bins (32 pages per bin) | |
| //! - Locks (16 bins per lock) | |
| //! - Bit sieves (64 bins per sieve) for quick full/empty status | |
| //! | |
| //! The implementation uses atomic operations and fine-grained locking to ensure thread safety | |
| //! while maintaining high performance for concurrent allocations and deallocations. | |
| //! Each page buffer is automatically returned to the pool when dropped. | |
| use std::ptr::NonNull; | |
| use std::alloc::Layout; | |
| use std::sync::atomic::AtomicU64; | |
| use crate::types::PageHeader; | |
| use crate::primitives::sync::Arc; | |
| use crate::primitives::sync::atomic::{AtomicU32, Ordering}; | |
| use crate::primitives::sync::Mutex; | |
| struct PageBuffer { | |
| pool: Arc<BufferPool>, | |
| ptr: NonNull<u8>, | |
| } | |
| const BIN_SIZE: usize = 32; | |
| const LOCK_SIZE: usize = 16; | |
| const BIT_SIEVES_SIZE: usize = 64; | |
| impl PageBuffer { | |
| pub fn data_mut(&mut self) -> &mut [u8] { | |
| unsafe { | |
| std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.pool.page_size) | |
| } | |
| } | |
| pub fn data(&self) -> &[u8] { | |
| unsafe { | |
| std::slice::from_raw_parts(self.ptr.as_ptr(), self.pool.page_size) | |
| } | |
| } | |
| } | |
| impl Drop for PageBuffer { | |
| fn drop(&mut self) { | |
| unsafe { | |
| self.pool.free(self.ptr) | |
| } | |
| } | |
| } | |
| unsafe impl Send for BufferPool {} | |
| unsafe impl Sync for BufferPool {} | |
| pub struct BufferPool { | |
| buffers: NonNull<u8>, | |
| /// bitset: if 1, page is free, else page is used | |
| bins: Box<[AtomicU32]>, | |
| /// lock on a free_page buffer | |
| locks: Box<[Mutex<u16>]>, | |
| page_size: usize, | |
| full_sieve: Box<[AtomicU64]>, | |
| } | |
| impl BufferPool { | |
| /// allocates n page buffers of size page_size | |
| pub fn new(n_buffers: usize, page_size: usize) -> Self { | |
| assert!(page_size.is_power_of_two()); | |
| assert!(n_buffers.is_power_of_two()); | |
| let buffers = unsafe { | |
| let size = page_size * n_buffers; | |
| let layout = Layout::from_size_align(size, std::mem::align_of::<PageHeader>()).unwrap(); | |
| let ptr = std::alloc::alloc(layout); | |
| assert!(!ptr.is_null()); | |
| NonNull::new(ptr).unwrap() | |
| }; | |
| let bins = std::iter::repeat_with(|| AtomicU32::new(u32::MAX)).take(n_buffers / 32).collect::<Vec<_>>().into_boxed_slice(); | |
| let locks = std::iter::repeat_with(|| Mutex::new(0)).take((bins.len() / 16) + 1).collect::<Vec<_>>().into_boxed_slice(); | |
| let full_sieve = std::iter::repeat_with(|| AtomicU64::new(0)).take((bins.len() / 64) + 1).collect::<Vec<_>>().into_boxed_slice(); | |
| Self { | |
| buffers, | |
| bins, | |
| locks, | |
| page_size, | |
| full_sieve, | |
| } | |
| } | |
| fn bin_lock_idx(&self, bin: usize) -> usize { | |
| bin / LOCK_SIZE | |
| } | |
| pub fn free_buffer_count(&self) -> usize { | |
| self.bins.iter().map(|b| b.load(Ordering::Relaxed).count_ones() as usize).sum::<usize>() | |
| } | |
| fn try_get_bin(self: &Arc<Self>, bin_idx: usize) -> Option<PageBuffer> { | |
| let bin = &self.bins[bin_idx]; | |
| // acquire lock | |
| self.acquire_lock(bin_idx); | |
| // take buffer | |
| let mut free = bin.load(Ordering::Relaxed); | |
| if free == 0 { | |
| // the bin is already full | |
| self.release_lock(bin_idx); | |
| return None | |
| } | |
| // set all bits to 0 except for LSB | |
| let last_bit = (!free + 1) & free; | |
| // subtract one to set LSB to 0 and all bits before to 1s, and count them | |
| let item = (last_bit - 1).count_ones(); | |
| // unset the LSB from the current block. | |
| free &= !last_bit; | |
| bin.store(free, Ordering::Relaxed); | |
| // bin is full | |
| if free == 0 { | |
| self.set_full(bin_idx); | |
| } | |
| // release lock | |
| let buf_idx = (item as usize + bin_idx * 32) * self.page_size; | |
| let ptr = unsafe { self.buffers.offset(buf_idx as isize) }; | |
| assert!((ptr.as_ptr() as *const PageHeader).is_aligned()); | |
| let buf = PageBuffer { | |
| pool: self.clone(), | |
| ptr, | |
| }; | |
| self.release_lock(bin_idx); | |
| Some(buf) | |
| } | |
| fn acquire_lock(&self, bin_idx: usize) { | |
| loop { | |
| let lock = &self.locks[self.bin_lock_idx(bin_idx)]; | |
| let lock_bit = 1 << (bin_idx % LOCK_SIZE); | |
| let mut g = lock.lock().unwrap(); | |
| let bits = *g; | |
| // lock is available | |
| if bits & lock_bit == 0 { | |
| *g |= lock_bit; | |
| return | |
| } | |
| } | |
| } | |
| /// the caller is expected to already have the lock | |
| fn release_lock(&self, bin_idx: usize) { | |
| let lock = &self.locks[self.bin_lock_idx(bin_idx)]; | |
| let lock_bit = 1 << (bin_idx % LOCK_SIZE); | |
| let mut g = lock.lock().unwrap(); | |
| *g &= !lock_bit; | |
| } | |
| pub fn set_full(&self, bin_idx: usize) { | |
| // we have a lock on this specific bit, so the order doesn't really matter | |
| self.full_sieve[bin_idx / BIT_SIEVES_SIZE].fetch_or(1 << (bin_idx % BIT_SIEVES_SIZE), Ordering::Relaxed); | |
| } | |
| pub fn clear_full(&self, bin_idx: usize) { | |
| // we have a lock on this specific bit, so the order doesn't really matter | |
| self.full_sieve[bin_idx / BIT_SIEVES_SIZE].fetch_and(!(1 << (bin_idx % BIT_SIEVES_SIZE)), Ordering::Relaxed); | |
| } | |
| pub fn get_page(self: &Arc<Self>) -> Option<PageBuffer> { | |
| // find a non-empty bucket | |
| let not_full = self.full_sieve.iter().enumerate().find(|(_, it)| it.load(Ordering::Relaxed) != u64::MAX); | |
| match not_full { | |
| Some((idx, s)) => { | |
| for i in 0..BIT_SIEVES_SIZE { | |
| // todo: we can avoid this comparison | |
| if s.load(Ordering::Relaxed) & (1 << i) == 0 { | |
| let bin_idx = idx * BIT_SIEVES_SIZE + i; | |
| if bin_idx < self.bins.len() { | |
| if let Some(b) = self.try_get_bin(bin_idx) { return Some(b) } | |
| } else { | |
| return None | |
| } | |
| } | |
| } | |
| } | |
| None => { | |
| dbg!("all buckets are full"); | |
| } | |
| } | |
| None | |
| } | |
| /// Safety: the caller must guarantee that there are no outstanding reference to the slot, | |
| /// since it can be reused after this call | |
| unsafe fn free(&self, ptr: NonNull<u8>) { | |
| let addr = ptr.as_ptr() as usize; | |
| let orig = self.buffers.as_ptr() as usize; | |
| let item = (addr - orig) / self.page_size; | |
| let bin_idx = item / 32; | |
| // acquire lock | |
| self.acquire_lock(bin_idx); | |
| let bin = &self.bins[bin_idx]; | |
| let bits = bin.load(Ordering::Relaxed); | |
| bin.store(bits | (1 << (item % 32)), Ordering::Relaxed); | |
| // bin was full | |
| if bits == 0 { | |
| self.clear_full(bin_idx); | |
| } | |
| // release lock | |
| self.release_lock(bin_idx); | |
| } | |
| } | |
| #[cfg(test)] | |
| mod test { | |
| use std::{thread, time::Instant}; | |
| use rand::random; | |
| use super::*; | |
| #[test] | |
| fn test() { | |
| let pool = Arc::new(BufferPool::new(128, 4096)); | |
| let mut pages = Vec::new(); | |
| for _ in 0..128 { | |
| let b = pool.get_page(); | |
| assert!(b.is_some()); | |
| pages.push(b.unwrap()); | |
| } | |
| assert!(pool.get_page().is_none()); | |
| pages.pop(); | |
| assert!(pool.get_page().is_some()); | |
| } | |
| #[test] | |
| fn very_long() { | |
| let pool = Arc::new(BufferPool::new(2048, 512)); | |
| let mut ths = Vec::new(); | |
| for i in 0..100 { | |
| let pool = pool.clone(); | |
| let t = thread::spawn(move || { | |
| for _ in 0..100_000 { | |
| let mut buf = pool.get_page().unwrap(); | |
| buf.data_mut().fill(i); | |
| assert!(buf.data().iter().all(|it| *it == i)); | |
| } | |
| }); | |
| ths.push(t); | |
| } | |
| for t in ths { | |
| t.join().unwrap(); | |
| } | |
| } | |
| #[test] | |
| fn random_fill() { | |
| let pool = Arc::new(BufferPool::new(1024 * 1024 * 16, 16)); | |
| pool.bins.iter().for_each(|s| s.store(random(), Ordering::Relaxed)); | |
| pool.bins.iter().enumerate().for_each(|(idx, bin)| { | |
| if bin.load(Ordering::Relaxed) == 0 { | |
| pool.set_full(idx); | |
| } | |
| }); | |
| let before = Instant::now(); | |
| let count = pool.free_buffer_count(); | |
| let mut bufs = Vec::with_capacity(count); | |
| dbg!(count); | |
| while let Some(buf) = pool.get_page() { | |
| bufs.push(buf) | |
| } | |
| dbg!(bufs.len()); | |
| dbg!(before.elapsed() / count as u32); | |
| dbg!(pool.full_sieve.iter().all(|f| f.load(Ordering::Relaxed) == u64::MAX)); | |
| } | |
| #[cfg(loom)] | |
| #[test] | |
| fn conccurent_access() { | |
| loom::model(|| { | |
| let pool = Arc::new(BufferPool::new(4, 16)); | |
| let mut ths = Vec::new(); | |
| for i in 0..4 { | |
| let pool = pool.clone(); | |
| let h = loom::thread::spawn(move || { | |
| loop { | |
| let Some(mut buf) = pool.get_page() else { | |
| loom::thread::yield_now(); | |
| continue; | |
| }; | |
| buf.data_mut().fill(i); | |
| assert!(buf.data().iter().all(|it| *it == i)); | |
| break | |
| } | |
| }); | |
| ths.push(h); | |
| } | |
| for t in ths { | |
| t.join().unwrap(); | |
| } | |
| }) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment