Skip to content

Commit

Permalink
Experiment with ArrayByteBufferPool performance (#11426)
Browse files Browse the repository at this point in the history
* Experiment with ArrayByteBufferPool

No overall size accounting
reserved buffer release always checks max memory
released buffers check max memory 1% of the time.
only a single thread can check memory at once.
single pass through buckets so no looping forever.

* Experiment with ArrayByteBufferPool

updates from review

* JMH updates

* updates from review

* Fixed comments.
Fixed call to recordEvict().
Removed unused methods.
Method getAvailable*Memory() no longer JMX-enabled, as they are the same as get*Memory().

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

---------

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
gregw and sbordet authored Feb 21, 2024
1 parent 509ede1 commit 750584b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
Expand Down Expand Up @@ -64,9 +64,8 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private final int _maxCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();
private final IntUnaryOperator _bucketIndexFor;
private final AtomicBoolean _evictor = new AtomicBoolean(false);
private boolean _statisticsEnabled;

/**
Expand Down Expand Up @@ -215,7 +214,6 @@ public RetainableByteBuffer acquire(int size, boolean direct)
if (entry != null)
{
bucket.recordPooled();
subtractMemory(bucket.getCapacity(), direct);
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
Expand All @@ -228,35 +226,25 @@ private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
{
bucket.recordRelease();

boolean direct = buffer.isDirect();
int capacity = bucket.getCapacity();

// Discard the buffer if maxMemory is exceeded.
long excessMemory = addMemoryAndGetExcess(bucket, direct);
if (excessMemory > 0)
{
subtractMemory(capacity, direct);
bucket.recordNonPooled();
return;
}

// Try to reserve an entry to put the buffer into the pool.
Pool.Entry<RetainableByteBuffer> entry = bucket.getPool().reserve();
// Cannot reserve, discard the buffer.
if (entry == null)
{
subtractMemory(capacity, direct);
bucket.recordNonPooled();
return;
}

// Add the buffer to the new entry.
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.reset(byteBuffer);
Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry));
if (entry.enable(pooledBuffer, false))
{
checkMaxMemory(bucket, buffer.isDirect());
return;
}

// Discard the buffer if the entry cannot be enabled.
subtractMemory(capacity, direct);
bucket.recordNonPooled();
entry.remove();
}
Expand All @@ -267,46 +255,43 @@ private void release(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> ent

RetainableByteBuffer buffer = entry.getPooled();
BufferUtil.reset(buffer.getByteBuffer());
boolean direct = buffer.isDirect();
int capacity = bucket.getCapacity();
long excessMemory = addMemoryAndGetExcess(bucket, direct);
if (excessMemory > 0)
{
bucket.recordEvict();
// If we cannot free enough space for the entry, remove it.
if (!evict(excessMemory, bucket, direct))
{
subtractMemory(capacity, direct);
bucket.recordRemove();
entry.remove();
return;
}
}

// We have enough space for this entry, pool it.
// Release the buffer and check the memory 1% of the times.
int used = ((Buffer)buffer).use();
if (entry.release())
{
if (used % 100 == 0)
checkMaxMemory(bucket, buffer.isDirect());
return;
}

// Not enough space, discard this buffer.
subtractMemory(capacity, direct);
// Cannot release, discard this buffer.
bucket.recordRemove();
entry.remove();
}

private long addMemoryAndGetExcess(RetainedBucket bucket, boolean direct)
private void checkMaxMemory(RetainedBucket bucket, boolean direct)
{
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
if (maxMemory < 0)
return -1;

AtomicLong memory = direct ? _directMemory : _heapMemory;
int capacity = bucket.getCapacity();
long newMemory = memory.addAndGet(capacity);
// Account the excess at most for the bucket capacity.
return Math.min(capacity, newMemory - maxMemory);
long max = direct ? _maxDirectMemory : _maxHeapMemory;
if (max <= 0 || !_evictor.compareAndSet(false, true))
return;
try
{
long memory = getMemory(direct);
long excess = memory - max;
if (excess > 0)
{
bucket.recordEvict();
evict(excess, direct);
}
}
finally
{
_evictor.set(false);
}
}

private boolean evict(long excessMemory, RetainedBucket target, boolean direct)
private void evict(long excessMemory, boolean direct)
{
RetainedBucket[] buckets = direct ? _direct : _indirect;
int length = buckets.length;
Expand All @@ -316,28 +301,12 @@ private boolean evict(long excessMemory, RetainedBucket target, boolean direct)
RetainedBucket bucket = buckets[index++];
if (index == length)
index = 0;
// Do not evict from the bucket the buffer is released into.
if (bucket == target)
continue;

int evicted = bucket.evict();
subtractMemory(evicted, direct);

excessMemory -= evicted;
if (excessMemory <= 0)
return true;
return;
}
return false;
}

protected ByteBuffer allocate(int capacity)
{
return ByteBuffer.allocate(capacity);
}

protected ByteBuffer allocateDirect(int capacity)
{
return ByteBuffer.allocateDirect(capacity);
}

private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
Expand Down Expand Up @@ -415,47 +384,27 @@ public long getHeapMemory()

private long getMemory(boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
return memory.longValue();
}

private void subtractMemory(int amount, boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
memory.addAndGet(-amount);
long size = 0;
for (RetainedBucket bucket : direct ? _direct : _indirect)
size += (long)bucket.getPool().getIdleCount() * bucket.getCapacity();
return size;
}

@ManagedAttribute("The available bytes retained by direct ByteBuffers")
public long getAvailableDirectMemory()
{
return getAvailableMemory(true);
return getDirectMemory();
}

@ManagedAttribute("The available bytes retained by heap ByteBuffers")
public long getAvailableHeapMemory()
{
return getAvailableMemory(false);
}

private long getAvailableMemory(boolean direct)
{
RetainedBucket[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (RetainedBucket bucket : buckets)
{
long capacity = bucket.getCapacity();
total += bucket.getPool().getIdleCount() * capacity;
}
return total;
return getHeapMemory();
}

@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{
clearBuckets(_direct);
_directMemory.set(0);
clearBuckets(_indirect);
_heapMemory.set(0);
}

private void clearBuckets(RetainedBucket[] buckets)
Expand Down Expand Up @@ -484,8 +433,8 @@ public String toString()
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
getMemory(false), _maxHeapMemory,
getMemory(true), _maxDirectMemory);
getHeapMemory(), _maxHeapMemory,
getDirectMemory(), _maxDirectMemory);
}

private class RetainedBucket
Expand Down Expand Up @@ -634,12 +583,13 @@ private Pool.Entry<RetainableByteBuffer> evict()

private static class Buffer extends AbstractRetainableByteBuffer
{
private final Consumer<RetainableByteBuffer> releaser;
private final Consumer<RetainableByteBuffer> _releaser;
private int _usages;

private Buffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
{
super(buffer);
this.releaser = releaser;
this._releaser = releaser;
}

@Override
Expand All @@ -648,17 +598,24 @@ public boolean release()
boolean released = super.release();
if (released)
{
if (releaser != null)
releaser.accept(this);
if (_releaser != null)
_releaser.accept(this);
}
return released;
}

private int use()
{
if (++_usages < 0)
_usages = 0;
return _usages;
}
}

/**
* A variant of the {@link ArrayByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
* 2 (e.g. 1k, 2k, 4k, 8k, etc.).
*/
public static class Quadratic extends ArrayByteBufferPool
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
Expand All @@ -45,22 +44,8 @@ public void testMaxMemoryEviction()

List<RetainableByteBuffer> buffers = new ArrayList<>();

buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
for (int i = 0; i < 200; i++)
buffers.add(pool.acquire(10 + i / 10, true));

assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
Expand All @@ -73,7 +58,22 @@ public void testMaxMemoryEviction()
assertThat(pool.getDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size()));
assertThat(pool.getDirectMemory(), greaterThan(0L));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), lessThan(120L));

buffers.clear();
for (int i = 0; i < 200; i++)
buffers.add(pool.acquire(10 + i / 10, true));

long maxSize = 0;
for (RetainableByteBuffer buffer : buffers)
{
buffer.release();
long size = pool.getDirectMemory();
maxSize = Math.max(size, maxSize);
}

// Test that size is never too much over target max
assertThat(maxSize, lessThan(100L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) throws RunnerException
.measurementTime(TimeValue.milliseconds(500))
.addProfiler(AsyncProfiler.class, "dir=/tmp;output=flamegraph;event=cpu;interval=500000;libPath=" + asyncProfilerPath)
.forks(1)
.threads(10)
.threads(32)
.build();
new Runner(opt).run();
}
Expand Down Expand Up @@ -124,4 +124,12 @@ public void inputFixedCapacityOutputRandomCapacityMigrating()
output.release();
input.release();
}

@Benchmark
@BenchmarkMode({Mode.Throughput})
public void fastPathAcquireRelease()
{
RetainableByteBuffer buffer = pool.acquire(65535, true);
buffer.release();
}
}

0 comments on commit 750584b

Please sign in to comment.