From 750584bc853d9ba300f8d3fb2433158f0968ebbf Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 21 Feb 2024 22:03:00 +0100 Subject: [PATCH] Experiment with ArrayByteBufferPool performance (#11426) * Experiment with ArrayByteBufferPool No overall size accounting reserved buffer release always checks max memory released buffers check max memory 1% of the time. only a single thread can check memory at once. single pass through buckets so no looping forever. * Experiment with ArrayByteBufferPool updates from review * JMH updates * updates from review * Fixed comments. Fixed call to recordEvict(). Removed unused methods. Method getAvailable*Memory() no longer JMX-enabled, as they are the same as get*Memory(). Signed-off-by: Simone Bordet --------- Signed-off-by: Simone Bordet Co-authored-by: Simone Bordet --- .../eclipse/jetty/io/ArrayByteBufferPool.java | 153 +++++++----------- .../jetty/io/ArrayByteBufferPoolTest.java | 36 ++--- .../io/jmh/ArrayByteBufferPoolBenchmark.java | 10 +- 3 files changed, 82 insertions(+), 117 deletions(-) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index bd4d71a0925c..0fab64e92ae3 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.function.IntUnaryOperator; @@ -64,9 +64,8 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable private final int _maxCapacity; private final long _maxHeapMemory; private final long _maxDirectMemory; - private final AtomicLong _heapMemory = new AtomicLong(); - private final AtomicLong _directMemory = new AtomicLong(); private final IntUnaryOperator _bucketIndexFor; + private final AtomicBoolean _evictor = new AtomicBoolean(false); private boolean _statisticsEnabled; /** @@ -215,7 +214,6 @@ public RetainableByteBuffer acquire(int size, boolean direct) if (entry != null) { bucket.recordPooled(); - subtractMemory(bucket.getCapacity(), direct); RetainableByteBuffer buffer = entry.getPooled(); ((Buffer)buffer).acquire(); return buffer; @@ -228,35 +226,25 @@ private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer) { bucket.recordRelease(); - boolean direct = buffer.isDirect(); - int capacity = bucket.getCapacity(); - - // Discard the buffer if maxMemory is exceeded. - long excessMemory = addMemoryAndGetExcess(bucket, direct); - if (excessMemory > 0) - { - subtractMemory(capacity, direct); - bucket.recordNonPooled(); - return; - } - + // Try to reserve an entry to put the buffer into the pool. Pool.Entry entry = bucket.getPool().reserve(); - // Cannot reserve, discard the buffer. if (entry == null) { - subtractMemory(capacity, direct); bucket.recordNonPooled(); return; } + // Add the buffer to the new entry. ByteBuffer byteBuffer = buffer.getByteBuffer(); BufferUtil.reset(byteBuffer); Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry)); if (entry.enable(pooledBuffer, false)) + { + checkMaxMemory(bucket, buffer.isDirect()); return; + } // Discard the buffer if the entry cannot be enabled. - subtractMemory(capacity, direct); bucket.recordNonPooled(); entry.remove(); } @@ -267,46 +255,43 @@ private void release(RetainedBucket bucket, Pool.Entry ent RetainableByteBuffer buffer = entry.getPooled(); BufferUtil.reset(buffer.getByteBuffer()); - boolean direct = buffer.isDirect(); - int capacity = bucket.getCapacity(); - long excessMemory = addMemoryAndGetExcess(bucket, direct); - if (excessMemory > 0) - { - bucket.recordEvict(); - // If we cannot free enough space for the entry, remove it. - if (!evict(excessMemory, bucket, direct)) - { - subtractMemory(capacity, direct); - bucket.recordRemove(); - entry.remove(); - return; - } - } - // We have enough space for this entry, pool it. + // Release the buffer and check the memory 1% of the times. + int used = ((Buffer)buffer).use(); if (entry.release()) + { + if (used % 100 == 0) + checkMaxMemory(bucket, buffer.isDirect()); return; + } - // Not enough space, discard this buffer. - subtractMemory(capacity, direct); + // Cannot release, discard this buffer. bucket.recordRemove(); entry.remove(); } - private long addMemoryAndGetExcess(RetainedBucket bucket, boolean direct) + private void checkMaxMemory(RetainedBucket bucket, boolean direct) { - long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory; - if (maxMemory < 0) - return -1; - - AtomicLong memory = direct ? _directMemory : _heapMemory; - int capacity = bucket.getCapacity(); - long newMemory = memory.addAndGet(capacity); - // Account the excess at most for the bucket capacity. - return Math.min(capacity, newMemory - maxMemory); + long max = direct ? _maxDirectMemory : _maxHeapMemory; + if (max <= 0 || !_evictor.compareAndSet(false, true)) + return; + try + { + long memory = getMemory(direct); + long excess = memory - max; + if (excess > 0) + { + bucket.recordEvict(); + evict(excess, direct); + } + } + finally + { + _evictor.set(false); + } } - private boolean evict(long excessMemory, RetainedBucket target, boolean direct) + private void evict(long excessMemory, boolean direct) { RetainedBucket[] buckets = direct ? _direct : _indirect; int length = buckets.length; @@ -316,28 +301,12 @@ private boolean evict(long excessMemory, RetainedBucket target, boolean direct) RetainedBucket bucket = buckets[index++]; if (index == length) index = 0; - // Do not evict from the bucket the buffer is released into. - if (bucket == target) - continue; int evicted = bucket.evict(); - subtractMemory(evicted, direct); - excessMemory -= evicted; if (excessMemory <= 0) - return true; + return; } - return false; - } - - protected ByteBuffer allocate(int capacity) - { - return ByteBuffer.allocate(capacity); - } - - protected ByteBuffer allocateDirect(int capacity) - { - return ByteBuffer.allocateDirect(capacity); } private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer releaser) @@ -415,47 +384,27 @@ public long getHeapMemory() private long getMemory(boolean direct) { - AtomicLong memory = direct ? _directMemory : _heapMemory; - return memory.longValue(); - } - - private void subtractMemory(int amount, boolean direct) - { - AtomicLong memory = direct ? _directMemory : _heapMemory; - memory.addAndGet(-amount); + long size = 0; + for (RetainedBucket bucket : direct ? _direct : _indirect) + size += (long)bucket.getPool().getIdleCount() * bucket.getCapacity(); + return size; } - @ManagedAttribute("The available bytes retained by direct ByteBuffers") public long getAvailableDirectMemory() { - return getAvailableMemory(true); + return getDirectMemory(); } - @ManagedAttribute("The available bytes retained by heap ByteBuffers") public long getAvailableHeapMemory() { - return getAvailableMemory(false); - } - - private long getAvailableMemory(boolean direct) - { - RetainedBucket[] buckets = direct ? _direct : _indirect; - long total = 0L; - for (RetainedBucket bucket : buckets) - { - long capacity = bucket.getCapacity(); - total += bucket.getPool().getIdleCount() * capacity; - } - return total; + return getHeapMemory(); } @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") public void clear() { clearBuckets(_direct); - _directMemory.set(0); clearBuckets(_indirect); - _heapMemory.set(0); } private void clearBuckets(RetainedBucket[] buckets) @@ -484,8 +433,8 @@ public String toString() super.toString(), _minCapacity, _maxCapacity, _direct.length, - getMemory(false), _maxHeapMemory, - getMemory(true), _maxDirectMemory); + getHeapMemory(), _maxHeapMemory, + getDirectMemory(), _maxDirectMemory); } private class RetainedBucket @@ -634,12 +583,13 @@ private Pool.Entry evict() private static class Buffer extends AbstractRetainableByteBuffer { - private final Consumer releaser; + private final Consumer _releaser; + private int _usages; private Buffer(ByteBuffer buffer, Consumer releaser) { super(buffer); - this.releaser = releaser; + this._releaser = releaser; } @Override @@ -648,17 +598,24 @@ public boolean release() boolean released = super.release(); if (released) { - if (releaser != null) - releaser.accept(this); + if (_releaser != null) + _releaser.accept(this); } return released; } + + private int use() + { + if (++_usages < 0) + _usages = 0; + return _usages; + } } /** * A variant of the {@link ArrayByteBufferPool} that * uses buckets of buffers that increase in size by a power of - * 2 (eg 1k, 2k, 4k, 8k, etc.). + * 2 (e.g. 1k, 2k, 4k, 8k, etc.). */ public static class Quadratic extends ArrayByteBufferPool { diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 11f2f6da6065..59040fb24566 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -28,7 +28,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -45,22 +44,8 @@ public void testMaxMemoryEviction() List buffers = new ArrayList<>(); - buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), is(0L)); - buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), is(0L)); + for (int i = 0; i < 200; i++) + buffers.add(pool.acquire(10 + i / 10, true)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); assertThat(pool.getDirectByteBufferCount(), is(0L)); @@ -73,7 +58,22 @@ public void testMaxMemoryEviction() assertThat(pool.getDirectByteBufferCount(), greaterThan(0L)); assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size())); assertThat(pool.getDirectMemory(), greaterThan(0L)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), lessThan(120L)); + + buffers.clear(); + for (int i = 0; i < 200; i++) + buffers.add(pool.acquire(10 + i / 10, true)); + + long maxSize = 0; + for (RetainableByteBuffer buffer : buffers) + { + buffer.release(); + long size = pool.getDirectMemory(); + maxSize = Math.max(size, maxSize); + } + + // Test that size is never too much over target max + assertThat(maxSize, lessThan(100L)); } @Test diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java index a2bd546316db..8f6574c9ea63 100644 --- a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java @@ -47,7 +47,7 @@ public static void main(String[] args) throws RunnerException .measurementTime(TimeValue.milliseconds(500)) .addProfiler(AsyncProfiler.class, "dir=/tmp;output=flamegraph;event=cpu;interval=500000;libPath=" + asyncProfilerPath) .forks(1) - .threads(10) + .threads(32) .build(); new Runner(opt).run(); } @@ -124,4 +124,12 @@ public void inputFixedCapacityOutputRandomCapacityMigrating() output.release(); input.release(); } + + @Benchmark + @BenchmarkMode({Mode.Throughput}) + public void fastPathAcquireRelease() + { + RetainableByteBuffer buffer = pool.acquire(65535, true); + buffer.release(); + } }