Last active
November 13, 2025 10:22
-
-
Save Elanza-48/d8ad3bae8ab22cfee7dc30afed2d3c77 to your computer and use it in GitHub Desktop.
Java Fixed sized byte buffer pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.nio.ByteBuffer; | |
| import java.security.SecureRandom; | |
| import java.util.Set; | |
| import java.util.concurrent.CompletableFuture; | |
| import java.util.concurrent.ConcurrentHashMap; | |
| import java.util.concurrent.ConcurrentLinkedQueue; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.ScheduledExecutorService; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.concurrent.TimeoutException; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import java.util.concurrent.locks.Condition; | |
| import java.util.concurrent.locks.ReentrantLock; | |
| import java.util.function.Function; | |
| import lombok.extern.slf4j.Slf4j; | |
| /** | |
| * ByteBuffer pool with blocking acquire when pool is exhausted | |
| */ | |
| @Slf4j | |
| public class ByteBufferPool { | |
| private static final long ONE_MB = 1024 * 1024L; | |
| private static final int DEFAULT_ACQUISITION_TIME_IN_MINUTES = 5; | |
| private final ConcurrentLinkedQueue<BufferWrapper> availableBuffers = new ConcurrentLinkedQueue<>(); | |
| private final Set<Integer> registeredBufferIdentities = ConcurrentHashMap.newKeySet(); | |
| private final Set<Integer> inUseBufferIdentities = ConcurrentHashMap.newKeySet(); | |
| private final AtomicInteger totalBuffers = new AtomicInteger(0); | |
| private final int maxBuffers; | |
| private final int minBuffers; | |
| private final int bufferSize; | |
| private final long idleTimeoutMs; | |
| private final ReentrantLock lock = new ReentrantLock(); | |
| private final Condition bufferAvailable = lock.newCondition(); | |
| private final ScheduledExecutorService cleanupExecutor; | |
| private final String poolInstanceId = generatePoolId(); | |
| private volatile boolean shutdown = false; | |
| private static class BufferWrapper { | |
| final ByteBuffer buffer; | |
| final int bufferIdentity; | |
| volatile long lastUsedTime; | |
| BufferWrapper(ByteBuffer buffer) { | |
| this.buffer = buffer; | |
| this.bufferIdentity = System.identityHashCode(buffer); | |
| this.lastUsedTime = System.currentTimeMillis(); | |
| } | |
| long getIdleTimeMs() { | |
| return System.currentTimeMillis() - lastUsedTime; | |
| } | |
| } | |
| /** | |
| * Exception thrown when buffer ownership validation fails | |
| */ | |
| public static class InvalidBufferException extends IllegalArgumentException { | |
| public InvalidBufferException(String message) { | |
| super(message); | |
| } | |
| } | |
| public ByteBufferPool(int maxBuffers, int bufferSizeMB) { | |
| this(Math.max(1, maxBuffers / 4), maxBuffers, bufferSizeMB, 300); | |
| } | |
| public ByteBufferPool(int minBuffers, int maxBuffers, int bufferSizeMB, int idleTimeoutSeconds) { | |
| this.maxBuffers = maxBuffers; | |
| this.minBuffers = Math.min(minBuffers, maxBuffers); | |
| this.bufferSize = bufferSizeMB * (int) ONE_MB; | |
| this.idleTimeoutMs = TimeUnit.SECONDS.toMillis(idleTimeoutSeconds); | |
| // Start cleanup task | |
| this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> { | |
| Thread t = new Thread(r, "ByteBufferPool-Cleanup-" + poolInstanceId.substring(0, 8)); | |
| t.setDaemon(true); | |
| return t; | |
| }); | |
| this.cleanupExecutor.scheduleAtFixedRate( | |
| this::cleanupIdleBuffers, | |
| idleTimeoutSeconds, | |
| 2 * 60L, | |
| TimeUnit.SECONDS | |
| ); | |
| log.debug("[ByteBufferPool] Initialized pool {}: max={}, min={}, bufferSize={}MB, timeout={}s", | |
| poolInstanceId.substring(0, 8), maxBuffers, minBuffers, bufferSizeMB, idleTimeoutSeconds | |
| ); | |
| } | |
| /** | |
| * Acquire a buffer from the pool with ownership tracking | |
| */ | |
| public ByteBuffer acquire() throws TimeoutException, InterruptedException { | |
| if (shutdown) { | |
| throw new IllegalStateException("ByteBufferPool is shut down"); | |
| } | |
| // Try to get from pool first | |
| BufferWrapper wrapper = availableBuffers.poll(); | |
| if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) { | |
| return wrapper.buffer; | |
| } | |
| // Try to create a new buffer if under limit | |
| if (totalBuffers.get() < maxBuffers) { | |
| lock.lock(); | |
| try { | |
| // Double-check after acquiring lock | |
| if (totalBuffers.get() < maxBuffers) { | |
| BufferWrapper newWrapper = createBuffer(); | |
| if (newWrapper != null && inUseBufferIdentities.add(newWrapper.bufferIdentity)) { | |
| return newWrapper.buffer; | |
| } | |
| } | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| // Wait for a buffer to be released | |
| lock.lock(); | |
| try { | |
| while (!shutdown) { | |
| wrapper = availableBuffers.poll(); | |
| if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) { | |
| return wrapper.buffer; | |
| } | |
| if (!bufferAvailable.await(DEFAULT_ACQUISITION_TIME_IN_MINUTES, TimeUnit.MINUTES)) { | |
| throw new TimeoutException("Timeout waiting for ByteBuffer from pool"); | |
| } | |
| } | |
| throw new IllegalStateException("ByteBufferPool is shut down"); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| /** | |
| * Acquire with timeout | |
| */ | |
| public ByteBuffer acquire(long timeout, TimeUnit unit) throws InterruptedException { | |
| if (shutdown) { | |
| throw new IllegalStateException("ByteBufferPool is shut down"); | |
| } | |
| long deadlineNanos = System.nanoTime() + unit.toNanos(timeout); | |
| // Try to get from pool first | |
| BufferWrapper wrapper = availableBuffers.poll(); | |
| if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) { | |
| return wrapper.buffer; | |
| } | |
| // Try to create a new buffer if under limit | |
| if (totalBuffers.get() < maxBuffers) { | |
| lock.lock(); | |
| try { | |
| if (totalBuffers.get() < maxBuffers) { | |
| BufferWrapper newWrapper = createBuffer(); | |
| if (newWrapper != null && inUseBufferIdentities.add(newWrapper.bufferIdentity)) { | |
| return newWrapper.buffer; | |
| } | |
| } | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| // Wait for a buffer to be released | |
| lock.lock(); | |
| try { | |
| while (!shutdown) { | |
| wrapper = availableBuffers.poll(); | |
| if (wrapper != null && inUseBufferIdentities.add(wrapper.bufferIdentity)) { | |
| return wrapper.buffer; | |
| } | |
| long remainingNanos = deadlineNanos - System.nanoTime(); | |
| if (remainingNanos <= 0) { | |
| return null; // Timeout | |
| } | |
| bufferAvailable.awaitNanos(remainingNanos); | |
| } | |
| throw new IllegalStateException("ByteBufferPool is shut down"); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| /** | |
| * Release a buffer back to the pool WITH VALIDATION | |
| */ | |
| public void release(ByteBuffer buffer) { | |
| if (buffer == null || shutdown) { | |
| return; | |
| } | |
| int bufferIdentity = System.identityHashCode(buffer); | |
| // CRITICAL SECURITY CHECK #1: Is this buffer from our pool? | |
| if (!registeredBufferIdentities.contains(bufferIdentity)) { | |
| throw new InvalidBufferException(String.format( | |
| "Buffer release rejected: Buffer (identity=%d) does not belong to the pool (instance=%s). " + | |
| "This may indicate a security issue or double-release attempt.", | |
| bufferIdentity, poolInstanceId.substring(0, 8) | |
| )); | |
| } | |
| // CRITICAL SECURITY CHECK #2: Is this buffer currently in use? | |
| if (!inUseBufferIdentities.remove(bufferIdentity)) { | |
| throw new InvalidBufferException(String.format( | |
| "Buffer release rejected: Buffer (identity=%d) is not currently in use. " + | |
| "This may indicate a double-release attempt.", | |
| bufferIdentity | |
| )); | |
| } | |
| // All checks passed - release is valid | |
| buffer.clear(); | |
| availableBuffers.offer(new BufferWrapper(buffer)); | |
| lock.lock(); | |
| try { | |
| bufferAvailable.signal(); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| } | |
| /** | |
| * Create a new buffer and register it | |
| */ | |
| private BufferWrapper createBuffer() { | |
| try { | |
| ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize); | |
| BufferWrapper wrapper = new BufferWrapper(buffer); | |
| // Register this buffer's identity | |
| registeredBufferIdentities.add(wrapper.bufferIdentity); | |
| totalBuffers.incrementAndGet(); | |
| log.debug("[ByteBufferPool] Created buffer: {}", wrapper); | |
| return wrapper; | |
| } catch (OutOfMemoryError e) { | |
| log.error("[ByteBufferPool] Failed to allocate buffer - {} | pool id - {}", e.getMessage(), poolInstanceId); | |
| return null; | |
| } | |
| } | |
| /** | |
| * Cleanup idle buffers | |
| */ | |
| private void cleanupIdleBuffers() { | |
| if (shutdown) { | |
| return; | |
| } | |
| int currentTotal = totalBuffers.get(); | |
| if (currentTotal <= minBuffers) { | |
| return; | |
| } | |
| int cleaned = 0; | |
| BufferWrapper wrapper; | |
| while ((wrapper = availableBuffers.poll()) != null && currentTotal > minBuffers) { | |
| long idleTime = wrapper.getIdleTimeMs(); | |
| if (idleTime > idleTimeoutMs && currentTotal > minBuffers) { | |
| // Destroy this buffer | |
| registeredBufferIdentities.remove(wrapper.bufferIdentity); | |
| totalBuffers.decrementAndGet(); | |
| currentTotal--; | |
| cleaned++; | |
| // Help GC by clearing the buffer | |
| wrapper.buffer.clear(); | |
| log.debug("[ByteBufferPool] Destroyed buffer: {} (idle for {}s) | pool id - {}", | |
| wrapper, wrapper.getIdleTimeMs() / 1000, poolInstanceId); | |
| } else { | |
| availableBuffers.offer(wrapper); | |
| } | |
| } | |
| if (cleaned > 0) { | |
| log.debug("[ByteBufferPool] Cleanup complete. Destroyed {} buffers. Current: {}/{} (min: {})", | |
| cleaned, currentTotal, maxBuffers, minBuffers); | |
| System.gc(); | |
| } | |
| } | |
| /** | |
| * Force cleanup of all idle buffers above minimum | |
| */ | |
| public void forceCleanup() { | |
| cleanupIdleBuffers(); | |
| } | |
| /** | |
| * Validate if a buffer belongs to this pool (without throwing exception) | |
| */ | |
| public boolean isValidPoolBuffer(ByteBuffer buffer) { | |
| if (buffer == null) { | |
| return false; | |
| } | |
| int identity = System.identityHashCode(buffer); | |
| return registeredBufferIdentities.contains(identity); | |
| } | |
| /** | |
| * Shutdown the pool and release all resources | |
| */ | |
| public void shutdown() { | |
| shutdown = true; | |
| cleanupExecutor.shutdown(); | |
| lock.lock(); | |
| try { | |
| bufferAvailable.signalAll(); | |
| } finally { | |
| lock.unlock(); | |
| } | |
| // Clear all buffers to allow GC | |
| BufferWrapper wrapper; | |
| while ((wrapper = availableBuffers.poll()) != null) { | |
| wrapper.buffer.clear(); | |
| } | |
| registeredBufferIdentities.clear(); | |
| inUseBufferIdentities.clear(); | |
| totalBuffers.set(0); | |
| try { | |
| if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { | |
| cleanupExecutor.shutdownNow(); | |
| } | |
| } catch (InterruptedException e) { | |
| cleanupExecutor.shutdownNow(); | |
| Thread.currentThread().interrupt(); | |
| } | |
| } | |
| /** | |
| * Executes the provided action with an automatically managed buffer. | |
| * The buffer is acquired, cleared, passed to the function, and always released, | |
| * even if the function throws an exception. | |
| */ | |
| public <T> T withBuffer(Function<ByteBuffer, T> action) { | |
| ByteBuffer buffer = null; | |
| boolean acquired = false; | |
| try { | |
| buffer = this.acquire(); // use your existing acquire() | |
| acquired = true; | |
| buffer.clear(); | |
| return action.apply(buffer); | |
| } catch (TimeoutException e) { | |
| throw new RuntimeException("Timeout while acquiring buffer from pool", e); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new RuntimeException("Interrupted while acquiring buffer from pool", e); | |
| } finally { | |
| if (acquired && buffer != null) { | |
| try { | |
| this.release(buffer); | |
| } catch (InvalidBufferException ex) { | |
| log.error("[ByteBufferPool] Failed to release buffer: {}", ex.getMessage()); | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Executes the provided async action with an automatically managed buffer. | |
| * The buffer is acquired and released when the returned CompletableFuture completes. | |
| */ | |
| public <T> CompletableFuture<T> withBufferAsync(Function<ByteBuffer, CompletableFuture<T>> asyncAction) { | |
| ByteBuffer buffer; | |
| try { | |
| buffer = this.acquire(); | |
| buffer.clear(); | |
| } catch (TimeoutException e) { | |
| return CompletableFuture.failedFuture( | |
| new RuntimeException("Timeout while acquiring buffer from pool", e) | |
| ); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| return CompletableFuture.failedFuture( | |
| new RuntimeException("Interrupted while acquiring buffer from pool", e) | |
| ); | |
| } | |
| CompletableFuture<T> resultFuture; | |
| try { | |
| resultFuture = asyncAction.apply(buffer); | |
| } catch (Throwable t) { | |
| // if the function itself throws before returning a future | |
| releaseSafe(buffer); | |
| return CompletableFuture.failedFuture(t); | |
| } | |
| // Ensure release when the async operation completes | |
| return resultFuture.whenComplete((res, ex) -> releaseSafe(buffer)); | |
| } | |
| /** | |
| * Internal helper to safely release a buffer without throwing | |
| */ | |
| private void releaseSafe(ByteBuffer buffer) { | |
| try { | |
| release(buffer); | |
| } catch (Exception e) { | |
| log.error("[ByteBufferPool] Safe release failed: {}", e.getMessage()); | |
| } | |
| } | |
| /** | |
| * Get pool statistics with security info | |
| */ | |
| public PoolStats getStats() { | |
| return new PoolStats( | |
| totalBuffers.get(), | |
| availableBuffers.size(), | |
| maxBuffers, | |
| minBuffers, | |
| bufferSize, | |
| idleTimeoutMs, | |
| poolInstanceId | |
| ); | |
| } | |
| /** | |
| * Generate unique pool instance ID | |
| */ | |
| private static String generatePoolId() { | |
| return String.format("%016x", new SecureRandom().nextLong()); | |
| } | |
| public static class PoolStats { | |
| public final int totalBuffers; | |
| public final int availableBuffers; | |
| public final int maxBuffers; | |
| public final int minBuffers; | |
| public final int bufferSize; | |
| public final long idleTimeoutMs; | |
| public final String poolInstanceId; | |
| public PoolStats(int total, int available, int max, int min, | |
| int size, long timeout, String instanceId) { | |
| this.totalBuffers = total; | |
| this.availableBuffers = available; | |
| this.maxBuffers = max; | |
| this.minBuffers = min; | |
| this.bufferSize = size; | |
| this.idleTimeoutMs = timeout; | |
| this.poolInstanceId = instanceId; | |
| } | |
| public long getTotalMemoryMB() { | |
| return (long) totalBuffers * bufferSize / (1024 * 1024); | |
| } | |
| public long getAvailableMemoryMB() { | |
| return (long) availableBuffers * bufferSize / (1024 * 1024); | |
| } | |
| @Override | |
| public String toString() { | |
| return String.format( | |
| "Pool[id=%s, total=%d, available=%d, inUse=%d, max=%d, min=%d, " + | |
| "bufferSize=%dMB, totalMem=%dMB, inUseMem=%dMB, idleTimeout=%ds]", | |
| poolInstanceId.substring(0, 8), | |
| totalBuffers, availableBuffers, maxBuffers, minBuffers, | |
| bufferSize / (1024 * 1024), getTotalMemoryMB(), idleTimeoutMs / 1000 | |
| ); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment