Skip to content

Instantly share code, notes, and snippets.

@Elanza-48
Last active November 13, 2025 10:22
Show Gist options
  • Select an option

  • Save Elanza-48/d8ad3bae8ab22cfee7dc30afed2d3c77 to your computer and use it in GitHub Desktop.

Select an option

Save Elanza-48/d8ad3bae8ab22cfee7dc30afed2d3c77 to your computer and use it in GitHub Desktop.
Java Fixed sized byte buffer pool
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
/**
* ByteBuffer pool with blocking acquire when pool is exhausted
*/
@Slf4j
public class ByteBufferPool {
private static final long ONE_MB = 1024 * 1024L;
private static final int DEFAULT_ACQUISITION_TIME_IN_MINUTES = 5;
private final ConcurrentLinkedQueue<BufferWrapper> availableBuffers = new ConcurrentLinkedQueue<>();
private final Set<Integer> registeredBufferIdentities = ConcurrentHashMap.newKeySet();
private final Set<Integer> inUseBufferIdentities = ConcurrentHashMap.newKeySet();
private final AtomicInteger totalBuffers = new AtomicInteger(0);
private final int maxBuffers;
private final int minBuffers;
private final int bufferSize;
private final long idleTimeoutMs;
private final ReentrantLock lock = new ReentrantLock();
private final Condition bufferAvailable = lock.newCondition();
private final ScheduledExecutorService cleanupExecutor;
private final String poolInstanceId = generatePoolId();
private volatile boolean shutdown = false;
private static class BufferWrapper {
final ByteBuffer buffer;
final int bufferIdentity;
volatile long lastUsedTime;
BufferWrapper(ByteBuffer buffer) {
this.buffer = buffer;
this.bufferIdentity = System.identityHashCode(buffer);
this.lastUsedTime = System.currentTimeMillis();
}
long getIdleTimeMs() {
return System.currentTimeMillis() - lastUsedTime;
}
}
/**
* Exception thrown when buffer ownership validation fails
*/
public static class InvalidBufferException extends IllegalArgumentException {
public InvalidBufferException(String message) {
super(message);
}
}
public ByteBufferPool(int maxBuffers, int bufferSizeMB) {
this(Math.max(1, maxBuffers / 4), maxBuffers, bufferSizeMB, 300);
}
public ByteBufferPool(int minBuffers, int maxBuffers, int bufferSizeMB, int idleTimeoutSeconds) {
this.maxBuffers = maxBuffers;
this.minBuffers = Math.min(minBuffers, maxBuffers);
this.bufferSize = bufferSizeMB * (int) ONE_MB;
this.idleTimeoutMs = TimeUnit.SECONDS.toMillis(idleTimeoutSeconds);
// Start cleanup task
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ByteBufferPool-Cleanup-" + poolInstanceId.substring(0, 8));
t.setDaemon(true);
return t;
});
this.cleanupExecutor.scheduleAtFixedRate(
this::cleanupIdleBuffers,
idleTimeoutSeconds,
2 * 60L,
TimeUnit.SECONDS
);
log.debug("[ByteBufferPool] Initialized pool {}: max={}, min={}, bufferSize={}MB, timeout={}s",
poolInstanceId.substring(0, 8), maxBuffers, minBuffers, bufferSizeMB, idleTimeoutSeconds
);
}
/**
* Acquire a buffer from the pool with ownership tracking
*/
public ByteBuffer acquire() throws TimeoutException, InterruptedException {
if (shutdown) {
throw new IllegalStateException("ByteBufferPool is shut down");
}
// Try to get from pool first
BufferWrapper wrapper = availableBuffers.poll();
if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) {
return wrapper.buffer;
}
// Try to create a new buffer if under limit
if (totalBuffers.get() < maxBuffers) {
lock.lock();
try {
// Double-check after acquiring lock
if (totalBuffers.get() < maxBuffers) {
BufferWrapper newWrapper = createBuffer();
if (newWrapper != null && inUseBufferIdentities.add(newWrapper.bufferIdentity)) {
return newWrapper.buffer;
}
}
} finally {
lock.unlock();
}
}
// Wait for a buffer to be released
lock.lock();
try {
while (!shutdown) {
wrapper = availableBuffers.poll();
if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) {
return wrapper.buffer;
}
if (!bufferAvailable.await(DEFAULT_ACQUISITION_TIME_IN_MINUTES, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout waiting for ByteBuffer from pool");
}
}
throw new IllegalStateException("ByteBufferPool is shut down");
} finally {
lock.unlock();
}
}
/**
* Acquire with timeout
*/
public ByteBuffer acquire(long timeout, TimeUnit unit) throws InterruptedException {
if (shutdown) {
throw new IllegalStateException("ByteBufferPool is shut down");
}
long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
// Try to get from pool first
BufferWrapper wrapper = availableBuffers.poll();
if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) {
return wrapper.buffer;
}
// Try to create a new buffer if under limit
if (totalBuffers.get() < maxBuffers) {
lock.lock();
try {
if (totalBuffers.get() < maxBuffers) {
BufferWrapper newWrapper = createBuffer();
if (newWrapper != null && inUseBufferIdentities.add(newWrapper.bufferIdentity)) {
return newWrapper.buffer;
}
}
} finally {
lock.unlock();
}
}
// Wait for a buffer to be released
lock.lock();
try {
while (!shutdown) {
wrapper = availableBuffers.poll();
if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) {
return wrapper.buffer;
}
long remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos <= 0) {
return null; // Timeout
}
bufferAvailable.awaitNanos(remainingNanos);
}
throw new IllegalStateException("ByteBufferPool is shut down");
} finally {
lock.unlock();
}
}
/**
* Release a buffer back to the pool WITH VALIDATION
*/
public void release(ByteBuffer buffer) {
if (buffer == null || shutdown) {
return;
}
int bufferIdentity = System.identityHashCode(buffer);
// CRITICAL SECURITY CHECK #1: Is this buffer from our pool?
if (!registeredBufferIdentities.contains(bufferIdentity)) {
throw new InvalidBufferException(String.format(
"Buffer release rejected: Buffer (identity=%d) does not belong to the pool (instance=%s). " +
"This may indicate a security issue or double-release attempt.",
bufferIdentity, poolInstanceId.substring(0, 8)
));
}
// CRITICAL SECURITY CHECK #2: Is this buffer currently in use?
if (!inUseBufferIdentities.remove(bufferIdentity)) {
throw new InvalidBufferException(String.format(
"Buffer release rejected: Buffer (identity=%d) is not currently in use. " +
"This may indicate a double-release attempt.",
bufferIdentity
));
}
// All checks passed - release is valid
buffer.clear();
availableBuffers.offer(new BufferWrapper(buffer));
lock.lock();
try {
bufferAvailable.signal();
} finally {
lock.unlock();
}
}
/**
* Create a new buffer and register it
*/
private BufferWrapper createBuffer() {
try {
ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
BufferWrapper wrapper = new BufferWrapper(buffer);
// Register this buffer's identity
registeredBufferIdentities.add(wrapper.bufferIdentity);
totalBuffers.incrementAndGet();
log.debug("[ByteBufferPool] Created buffer: {}", wrapper);
return wrapper;
} catch (OutOfMemoryError e) {
log.error("[ByteBufferPool] Failed to allocate buffer - {} | pool id - {}", e.getMessage(), poolInstanceId);
return null;
}
}
/**
* Cleanup idle buffers
*/
private void cleanupIdleBuffers() {
if (shutdown) {
return;
}
int currentTotal = totalBuffers.get();
if (currentTotal <= minBuffers) {
return;
}
int cleaned = 0;
BufferWrapper wrapper;
while ((wrapper = availableBuffers.poll()) != null && currentTotal > minBuffers) {
long idleTime = wrapper.getIdleTimeMs();
if (idleTime > idleTimeoutMs && currentTotal > minBuffers) {
// Destroy this buffer
registeredBufferIdentities.remove(wrapper.bufferIdentity);
totalBuffers.decrementAndGet();
currentTotal--;
cleaned++;
// Help GC by clearing the buffer
wrapper.buffer.clear();
log.debug("[ByteBufferPool] Destroyed buffer: {} (idle for {}s) | pool id - {}",
wrapper, wrapper.getIdleTimeMs() / 1000, poolInstanceId);
} else {
availableBuffers.offer(wrapper);
}
}
if (cleaned > 0) {
log.debug("[ByteBufferPool] Cleanup complete. Destroyed {} buffers. Current: {}/{} (min: {})",
cleaned, currentTotal, maxBuffers, minBuffers);
System.gc();
}
}
/**
* Force cleanup of all idle buffers above minimum
*/
public void forceCleanup() {
cleanupIdleBuffers();
}
/**
* Validate if a buffer belongs to this pool (without throwing exception)
*/
public boolean isValidPoolBuffer(ByteBuffer buffer) {
if (buffer == null) {
return false;
}
int identity = System.identityHashCode(buffer);
return registeredBufferIdentities.contains(identity);
}
/**
* Shutdown the pool and release all resources
*/
public void shutdown() {
shutdown = true;
cleanupExecutor.shutdown();
lock.lock();
try {
bufferAvailable.signalAll();
} finally {
lock.unlock();
}
// Clear all buffers to allow GC
BufferWrapper wrapper;
while ((wrapper = availableBuffers.poll()) != null) {
wrapper.buffer.clear();
}
registeredBufferIdentities.clear();
inUseBufferIdentities.clear();
totalBuffers.set(0);
try {
if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
cleanupExecutor.shutdownNow();
}
} catch (InterruptedException e) {
cleanupExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* Executes the provided action with an automatically managed buffer.
* The buffer is acquired, cleared, passed to the function, and always released,
* even if the function throws an exception.
*/
public <T> T withBuffer(Function<ByteBuffer, T> action) {
ByteBuffer buffer = null;
boolean acquired = false;
try {
buffer = this.acquire(); // use your existing acquire()
acquired = true;
buffer.clear();
return action.apply(buffer);
} catch (TimeoutException e) {
throw new RuntimeException("Timeout while acquiring buffer from pool", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while acquiring buffer from pool", e);
} finally {
if (acquired && buffer != null) {
try {
this.release(buffer);
} catch (InvalidBufferException ex) {
log.error("[ByteBufferPool] Failed to release buffer: {}", ex.getMessage());
}
}
}
}
/**
* Executes the provided async action with an automatically managed buffer.
* The buffer is acquired and released when the returned CompletableFuture completes.
*/
public <T> CompletableFuture<T> withBufferAsync(Function<ByteBuffer, CompletableFuture<T>> asyncAction) {
ByteBuffer buffer;
try {
buffer = this.acquire();
buffer.clear();
} catch (TimeoutException e) {
return CompletableFuture.failedFuture(
new RuntimeException("Timeout while acquiring buffer from pool", e)
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(
new RuntimeException("Interrupted while acquiring buffer from pool", e)
);
}
CompletableFuture<T> resultFuture;
try {
resultFuture = asyncAction.apply(buffer);
} catch (Throwable t) {
// if the function itself throws before returning a future
releaseSafe(buffer);
return CompletableFuture.failedFuture(t);
}
// Ensure release when the async operation completes
return resultFuture.whenComplete((res, ex) -> releaseSafe(buffer));
}
/**
* Internal helper to safely release a buffer without throwing
*/
private void releaseSafe(ByteBuffer buffer) {
try {
release(buffer);
} catch (Exception e) {
log.error("[ByteBufferPool] Safe release failed: {}", e.getMessage());
}
}
/**
* Get pool statistics with security info
*/
public PoolStats getStats() {
return new PoolStats(
totalBuffers.get(),
availableBuffers.size(),
maxBuffers,
minBuffers,
bufferSize,
idleTimeoutMs,
poolInstanceId
);
}
/**
* Generate unique pool instance ID
*/
private static String generatePoolId() {
return String.format("%016x", new SecureRandom().nextLong());
}
public static class PoolStats {
public final int totalBuffers;
public final int availableBuffers;
public final int maxBuffers;
public final int minBuffers;
public final int bufferSize;
public final long idleTimeoutMs;
public final String poolInstanceId;
public PoolStats(int total, int available, int max, int min,
int size, long timeout, String instanceId) {
this.totalBuffers = total;
this.availableBuffers = available;
this.maxBuffers = max;
this.minBuffers = min;
this.bufferSize = size;
this.idleTimeoutMs = timeout;
this.poolInstanceId = instanceId;
}
public long getTotalMemoryMB() {
return (long) totalBuffers * bufferSize / (1024 * 1024);
}
public long getAvailableMemoryMB() {
return (long) availableBuffers * bufferSize / (1024 * 1024);
}
@Override
public String toString() {
return String.format(
"Pool[id=%s, total=%d, available=%d, inUse=%d, max=%d, min=%d, " +
"bufferSize=%dMB, totalMem=%dMB, inUseMem=%dMB, idleTimeout=%ds]",
poolInstanceId.substring(0, 8),
totalBuffers, availableBuffers, maxBuffers, minBuffers,
bufferSize / (1024 * 1024), getTotalMemoryMB(), idleTimeoutMs / 1000
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment