Skip to content

Instantly share code, notes, and snippets.

@MarinPostma
Created November 19, 2024 15:34
Show Gist options
  • Select an option

  • Save MarinPostma/fd9799751cbc69c4bcd862f80bed6f0d to your computer and use it in GitHub Desktop.

Select an option

Save MarinPostma/fd9799751cbc69c4bcd862f80bed6f0d to your computer and use it in GitHub Desktop.
bufferpool
//! A lock-free buffer pool implementation for managing fixed-size page buffers.
//!
//! This buffer pool provides efficient allocation and deallocation of fixed-size memory pages
//! using a bitmap-based allocation strategy. Key features include:
//!
//! - Lock-free concurrent access using atomic operations
//! - Fixed-size page allocation
//! - Efficient bitmap-based tracking of free/used pages
//! - Three-level hierarchy for managing pages:
//! - Bins (32 pages per bin)
//! - Locks (16 bins per lock)
//! - Bit sieves (64 bins per sieve) for quick full/empty status
//!
//! The implementation uses atomic operations and fine-grained locking to ensure thread safety
//! while maintaining high performance for concurrent allocations and deallocations.
//! Each page buffer is automatically returned to the pool when dropped.
use std::ptr::NonNull;
use std::alloc::Layout;
use std::sync::atomic::AtomicU64;
use crate::types::PageHeader;
use crate::primitives::sync::Arc;
use crate::primitives::sync::atomic::{AtomicU32, Ordering};
use crate::primitives::sync::Mutex;
struct PageBuffer {
pool: Arc<BufferPool>,
ptr: NonNull<u8>,
}
const BIN_SIZE: usize = 32;
const LOCK_SIZE: usize = 16;
const BIT_SIEVES_SIZE: usize = 64;
impl PageBuffer {
pub fn data_mut(&mut self) -> &mut [u8] {
unsafe {
std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.pool.page_size)
}
}
pub fn data(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(self.ptr.as_ptr(), self.pool.page_size)
}
}
}
impl Drop for PageBuffer {
fn drop(&mut self) {
unsafe {
self.pool.free(self.ptr)
}
}
}
unsafe impl Send for BufferPool {}
unsafe impl Sync for BufferPool {}
pub struct BufferPool {
buffers: NonNull<u8>,
/// bitset: if 1, page is free, else page is used
bins: Box<[AtomicU32]>,
/// lock on a free_page buffer
locks: Box<[Mutex<u16>]>,
page_size: usize,
full_sieve: Box<[AtomicU64]>,
}
impl BufferPool {
/// allocates n page buffers of size page_size
pub fn new(n_buffers: usize, page_size: usize) -> Self {
assert!(page_size.is_power_of_two());
assert!(n_buffers.is_power_of_two());
let buffers = unsafe {
let size = page_size * n_buffers;
let layout = Layout::from_size_align(size, std::mem::align_of::<PageHeader>()).unwrap();
let ptr = std::alloc::alloc(layout);
assert!(!ptr.is_null());
NonNull::new(ptr).unwrap()
};
let bins = std::iter::repeat_with(|| AtomicU32::new(u32::MAX)).take(n_buffers / 32).collect::<Vec<_>>().into_boxed_slice();
let locks = std::iter::repeat_with(|| Mutex::new(0)).take((bins.len() / 16) + 1).collect::<Vec<_>>().into_boxed_slice();
let full_sieve = std::iter::repeat_with(|| AtomicU64::new(0)).take((bins.len() / 64) + 1).collect::<Vec<_>>().into_boxed_slice();
Self {
buffers,
bins,
locks,
page_size,
full_sieve,
}
}
fn bin_lock_idx(&self, bin: usize) -> usize {
bin / LOCK_SIZE
}
pub fn free_buffer_count(&self) -> usize {
self.bins.iter().map(|b| b.load(Ordering::Relaxed).count_ones() as usize).sum::<usize>()
}
fn try_get_bin(self: &Arc<Self>, bin_idx: usize) -> Option<PageBuffer> {
let bin = &self.bins[bin_idx];
// acquire lock
self.acquire_lock(bin_idx);
// take buffer
let mut free = bin.load(Ordering::Relaxed);
if free == 0 {
// the bin is already full
self.release_lock(bin_idx);
return None
}
// set all bits to 0 except for LSB
let last_bit = (!free + 1) & free;
// subtract one to set LSB to 0 and all bits before to 1s, and count them
let item = (last_bit - 1).count_ones();
// unset the LSB from the current block.
free &= !last_bit;
bin.store(free, Ordering::Relaxed);
// bin is full
if free == 0 {
self.set_full(bin_idx);
}
// release lock
let buf_idx = (item as usize + bin_idx * 32) * self.page_size;
let ptr = unsafe { self.buffers.offset(buf_idx as isize) };
assert!((ptr.as_ptr() as *const PageHeader).is_aligned());
let buf = PageBuffer {
pool: self.clone(),
ptr,
};
self.release_lock(bin_idx);
Some(buf)
}
fn acquire_lock(&self, bin_idx: usize) {
loop {
let lock = &self.locks[self.bin_lock_idx(bin_idx)];
let lock_bit = 1 << (bin_idx % LOCK_SIZE);
let mut g = lock.lock().unwrap();
let bits = *g;
// lock is available
if bits & lock_bit == 0 {
*g |= lock_bit;
return
}
}
}
/// the caller is expected to already have the lock
fn release_lock(&self, bin_idx: usize) {
let lock = &self.locks[self.bin_lock_idx(bin_idx)];
let lock_bit = 1 << (bin_idx % LOCK_SIZE);
let mut g = lock.lock().unwrap();
*g &= !lock_bit;
}
pub fn set_full(&self, bin_idx: usize) {
// we have a lock on this specific bit, so the order doesn't really matter
self.full_sieve[bin_idx / BIT_SIEVES_SIZE].fetch_or(1 << (bin_idx % BIT_SIEVES_SIZE), Ordering::Relaxed);
}
pub fn clear_full(&self, bin_idx: usize) {
// we have a lock on this specific bit, so the order doesn't really matter
self.full_sieve[bin_idx / BIT_SIEVES_SIZE].fetch_and(!(1 << (bin_idx % BIT_SIEVES_SIZE)), Ordering::Relaxed);
}
pub fn get_page(self: &Arc<Self>) -> Option<PageBuffer> {
// find a non-empty bucket
let not_full = self.full_sieve.iter().enumerate().find(|(_, it)| it.load(Ordering::Relaxed) != u64::MAX);
match not_full {
Some((idx, s)) => {
for i in 0..BIT_SIEVES_SIZE {
// todo: we can avoid this comparison
if s.load(Ordering::Relaxed) & (1 << i) == 0 {
let bin_idx = idx * BIT_SIEVES_SIZE + i;
if bin_idx < self.bins.len() {
if let Some(b) = self.try_get_bin(bin_idx) { return Some(b) }
} else {
return None
}
}
}
}
None => {
dbg!("all buckets are full");
}
}
None
}
/// Safety: the caller must guarantee that there are no outstanding reference to the slot,
/// since it can be reused after this call
unsafe fn free(&self, ptr: NonNull<u8>) {
let addr = ptr.as_ptr() as usize;
let orig = self.buffers.as_ptr() as usize;
let item = (addr - orig) / self.page_size;
let bin_idx = item / 32;
// acquire lock
self.acquire_lock(bin_idx);
let bin = &self.bins[bin_idx];
let bits = bin.load(Ordering::Relaxed);
bin.store(bits | (1 << (item % 32)), Ordering::Relaxed);
// bin was full
if bits == 0 {
self.clear_full(bin_idx);
}
// release lock
self.release_lock(bin_idx);
}
}
#[cfg(test)]
mod test {
use std::{thread, time::Instant};
use rand::random;
use super::*;
#[test]
fn test() {
let pool = Arc::new(BufferPool::new(128, 4096));
let mut pages = Vec::new();
for _ in 0..128 {
let b = pool.get_page();
assert!(b.is_some());
pages.push(b.unwrap());
}
assert!(pool.get_page().is_none());
pages.pop();
assert!(pool.get_page().is_some());
}
#[test]
fn very_long() {
let pool = Arc::new(BufferPool::new(2048, 512));
let mut ths = Vec::new();
for i in 0..100 {
let pool = pool.clone();
let t = thread::spawn(move || {
for _ in 0..100_000 {
let mut buf = pool.get_page().unwrap();
buf.data_mut().fill(i);
assert!(buf.data().iter().all(|it| *it == i));
}
});
ths.push(t);
}
for t in ths {
t.join().unwrap();
}
}
#[test]
fn random_fill() {
let pool = Arc::new(BufferPool::new(1024 * 1024 * 16, 16));
pool.bins.iter().for_each(|s| s.store(random(), Ordering::Relaxed));
pool.bins.iter().enumerate().for_each(|(idx, bin)| {
if bin.load(Ordering::Relaxed) == 0 {
pool.set_full(idx);
}
});
let before = Instant::now();
let count = pool.free_buffer_count();
let mut bufs = Vec::with_capacity(count);
dbg!(count);
while let Some(buf) = pool.get_page() {
bufs.push(buf)
}
dbg!(bufs.len());
dbg!(before.elapsed() / count as u32);
dbg!(pool.full_sieve.iter().all(|f| f.load(Ordering::Relaxed) == u64::MAX));
}
#[cfg(loom)]
#[test]
fn conccurent_access() {
loom::model(|| {
let pool = Arc::new(BufferPool::new(4, 16));
let mut ths = Vec::new();
for i in 0..4 {
let pool = pool.clone();
let h = loom::thread::spawn(move || {
loop {
let Some(mut buf) = pool.get_page() else {
loom::thread::yield_now();
continue;
};
buf.data_mut().fill(i);
assert!(buf.data().iter().all(|it| *it == i));
break
}
});
ths.push(h);
}
for t in ths {
t.join().unwrap();
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment