From 6fca20c535161973d8e0b83d25340d0e904a0cdc Mon Sep 17 00:00:00 2001 From: Lenin Jaganathan Date: Wed, 12 Jul 2023 10:13:44 -0700 Subject: [PATCH] Add concurrency to exponential histogram - Rescaling is not frequent behaviour (for cumulative scale should normalize after initial few recording, for delta it could increase/decrease but would be mostly stable). Instead of opting for a full synchronization on writes (which would have less performance), the choice of ReadWriteLock is chosen. --- .../internal/Base2ExponentialHistogram.java | 86 ++++++++++++++++--- .../otlp/internal/CircularCountHolder.java | 15 +++- .../CumulativeBase2ExponentialHistogram.java | 4 +- .../DeltaBase2ExponentialHistogram.java | 10 +-- .../Base2ExponentialHistogramTest.java | 30 +++---- 5 files changed, 105 insertions(+), 40 deletions(-) diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogram.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogram.java index 8f58b3c794..70e5f097db 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogram.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogram.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import io.micrometer.common.lang.Nullable; @@ -49,6 +50,21 @@ */ public abstract class Base2ExponentialHistogram implements Histogram { + /* + * This is an alternative to full-blown synchronization on the Histogram which would + * severely affect the performance under high concurrent cases. + * + * Read lock guards adding data into histogram i,e multiple values can be recorded at + * the same time for a given scale. If scale needs to be changed, write-lock need to + * be acquired and all the operations are blocked until new scale is determined and + * updated with. + */ + private final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); + + private final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); + + private final ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); + private final int maxScale; private final int maxBucketsCount; @@ -131,13 +147,33 @@ public HistogramSnapshot takeSnapshot(final long count, final double total, fina } /** - * Returns the snapshot of current recorded values. + * Returns the snapshot of current recorded values. This method is thread-safe and + * block concurrent recordings to the underlying histogram. */ - ExponentialHistogramSnapShot getCurrentValuesSnapshot() { - return (circularCountHolder.isEmpty() && zeroCount.longValue() == 0) - ? DefaultExponentialHistogramSnapShot.getEmptySnapshotForScale(scale) - : new DefaultExponentialHistogramSnapShot(scale, zeroCount.longValue(), zeroThreshold, - new ExponentialBuckets(getOffset(), getBucketCounts()), EMPTY_EXPONENTIAL_BUCKET); + ExponentialHistogramSnapShot getCurrentValuesSnapshot(final boolean shouldResetHistogram) { + writeLock.lock(); + try { + ExponentialHistogramSnapShot exponentialHistogramSnapShot = getExponentialHistogramSnapShot(); + if (shouldResetHistogram) + reset(); + return exponentialHistogramSnapShot; + } + finally { + writeLock.unlock(); + } + } + + private ExponentialHistogramSnapShot getExponentialHistogramSnapShot() { + ExponentialHistogramSnapShot exponentialHistogramSnapShot; + if (circularCountHolder.isEmpty() && zeroCount.longValue() == 0) { + // Will only be possible for a delta variant + exponentialHistogramSnapShot = DefaultExponentialHistogramSnapShot.getEmptySnapshotForScale(scale); + } + else { + exponentialHistogramSnapShot = new DefaultExponentialHistogramSnapShot(scale, zeroCount.longValue(), + zeroThreshold, new ExponentialBuckets(getOffset(), getBucketCounts()), EMPTY_EXPONENTIAL_BUCKET); + } + return exponentialHistogramSnapShot; } /** @@ -168,17 +204,38 @@ public void recordDouble(double value) { return; } - int index = base2IndexProvider.getIndexForValue(value); - if (!circularCountHolder.increment(index, 1)) { - synchronized (this) { - int downScaleFactor = getDownScaleFactor(index); + recordInternal(value); + } + + private void recordInternal(double value) { + boolean hasRecorded; + readLock.lock(); + try { + hasRecorded = incrementValue(value); + } + finally { + readLock.unlock(); + } + + if (!hasRecorded) { + // If recording fails, then we MIGHT have to re-scale. Try re-scaling. + writeLock.lock(); + try { + final int downScaleFactor = getDownScaleFactor(base2IndexProvider.getIndexForValue(value)); downScale(downScaleFactor); - index = base2IndexProvider.getIndexForValue(value); - circularCountHolder.increment(index, 1); + // Record the value within the write lock to guarantee write. + incrementValue(value); + } + finally { + writeLock.unlock(); } } } + private boolean incrementValue(final double value) { + return circularCountHolder.increment(base2IndexProvider.getIndexForValue(value), 1); + } + /** * Reduces the scale of the histogram by downScaleFactor. The buckets are merged to * align with the exponential scale. @@ -186,6 +243,7 @@ public void recordDouble(double value) { */ private void downScale(int downScaleFactor) { if (downScaleFactor == 0) { + // Should never happen. return; } @@ -286,12 +344,12 @@ private List getBucketCounts() { * Reset the current values and possibly increase the scale based on current recorded * values; */ - synchronized void reset() { + // VisibleForTesting + void reset() { int upscaleFactor = getUpscaleFactor(); if (upscaleFactor > 0) { this.updateScale(this.scale + upscaleFactor); } - this.circularCountHolder.reset(); this.zeroCount.reset(); } diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CircularCountHolder.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CircularCountHolder.java index 0911a98c9a..ef2a89a92b 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CircularCountHolder.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CircularCountHolder.java @@ -81,8 +81,19 @@ else if (index < startIndex) { startIndex = index; } - counts.addAndGet(getRelativeIndex(index), incrementBy); - return true; + final int relativeIndex = getRelativeIndex(index); + if (relativeIndex >= 0 && relativeIndex < length) { + counts.addAndGet(relativeIndex, incrementBy); + return true; + } + /* + * This should not happen if the writes are fully exclusive but that would mean a + * poor performance. This case only possible (rarely) during initial writes on th + * histogram where there might be concurrent threads modifying the index and if it + * not fully synchronized externally. In this case, we would notify the caller + * that write has failed and let the caller take a call on what to do. + */ + return false; } private int getRelativeIndex(int index) { diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CumulativeBase2ExponentialHistogram.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CumulativeBase2ExponentialHistogram.java index 353ce6baaa..3d5219e56a 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CumulativeBase2ExponentialHistogram.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CumulativeBase2ExponentialHistogram.java @@ -59,8 +59,8 @@ public ExponentialHistogramSnapShot getLatestExponentialHistogramSnapshot() { } @Override - synchronized void takeExponentialHistogramSnapShot() { - this.exponentialHistogramSnapShot = getCurrentValuesSnapshot(); + void takeExponentialHistogramSnapShot() { + this.exponentialHistogramSnapShot = getCurrentValuesSnapshot(false); } } diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/DeltaBase2ExponentialHistogram.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/DeltaBase2ExponentialHistogram.java index 83d00d72d8..b185e6a605 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/DeltaBase2ExponentialHistogram.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/DeltaBase2ExponentialHistogram.java @@ -66,7 +66,7 @@ public ExponentialHistogramSnapShot getLatestExponentialHistogramSnapshot() { } @Override - synchronized void takeExponentialHistogramSnapShot() { + void takeExponentialHistogramSnapShot() { stepExponentialHistogramSnapShot.poll(); } @@ -82,12 +82,8 @@ public StepExponentialHistogramSnapShot(final Clock clock, final long stepMillis } @Override - protected synchronized Supplier valueSupplier() { - return () -> { - ExponentialHistogramSnapShot latestSnapShot = getCurrentValuesSnapshot(); - reset(); - return latestSnapShot; - }; + protected Supplier valueSupplier() { + return () -> getCurrentValuesSnapshot(true); } @Override diff --git a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogramTest.java b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogramTest.java index 53b65ef3c4..6393f5fb70 100644 --- a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogramTest.java +++ b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogramTest.java @@ -51,8 +51,8 @@ void testRecordDouble() { // 1 Always belongs to index 0. base2ExponentialHistogram.recordDouble(1.000000000001); assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE); - assertThat(base2ExponentialHistogram.getCurrentValuesSnapshot().zeroCount()).isZero(); - assertThat(getAllBucketsCountSum(base2ExponentialHistogram.getCurrentValuesSnapshot())).isEqualTo(1); + assertThat(base2ExponentialHistogram.getCurrentValuesSnapshot(false).zeroCount()).isZero(); + assertThat(getAllBucketsCountSum(base2ExponentialHistogram.getCurrentValuesSnapshot(false))).isEqualTo(1); } @Test @@ -66,7 +66,7 @@ void testRecordTimeBased() { // calling // recordDouble(2). - ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentSnapshot.zeroCount()).isZero(); assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE); assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2); @@ -84,13 +84,13 @@ void testRecordTimeBasedInSeconds() { // This should be same as calling recordDouble(0.05). base2ExponentialHistogram.recordLong(Duration.ofMillis(50).toNanos()); - ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentSnapshot.zeroCount()).isZero(); assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE); assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2); base2ExponentialHistogram.recordLong(Duration.ofMillis(90).toNanos()); - currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentSnapshot.scale()).isEqualTo(1); assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(3); } @@ -101,7 +101,7 @@ void testZeroThreshHold() { base2ExponentialHistogram.recordDouble(0.0); base2ExponentialHistogram.recordDouble(0.5); - ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentSnapshot.zeroThreshold()).isLessThan(1).isGreaterThan(0); assertThat(currentSnapshot.zeroCount()).isEqualTo(2); assertThat(currentSnapshot.scale()).isEqualTo(MAX_SCALE); @@ -113,7 +113,7 @@ void testZeroThreshHold() { base2ExponentialHistogramWithZeroAsMin.recordDouble(Math.nextUp(0.0)); ExponentialHistogramSnapShot snapshotWithZeroAsMin = base2ExponentialHistogramWithZeroAsMin - .getCurrentValuesSnapshot(); + .getCurrentValuesSnapshot(false); assertThat(snapshotWithZeroAsMin.zeroThreshold()).isEqualTo(0.0); assertThat(snapshotWithZeroAsMin.zeroCount()).isEqualTo(1); assertThat(snapshotWithZeroAsMin.scale()).isEqualTo(MAX_SCALE); @@ -124,7 +124,7 @@ void testZeroThreshHold() { void testDownScale() { base2ExponentialHistogram.recordDouble(1.0001); - ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentSnapshot.zeroCount()).isZero(); assertThat(currentSnapshot.scale()).isEqualTo(MAX_SCALE); assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(1); @@ -182,11 +182,11 @@ void testUpscale() { @Test void testValuesAtIndices() { - ExponentialHistogramSnapShot currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + ExponentialHistogramSnapShot currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentValueSnapshot.positive().bucketCounts()).isEmpty(); base2ExponentialHistogram.recordDouble(1.0001); - currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentValueSnapshot.positive().offset()).isZero(); assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(1); assertThat(currentValueSnapshot.positive().bucketCounts()).filteredOn(value -> value == 0).isEmpty(); @@ -195,7 +195,7 @@ void testValuesAtIndices() { base2ExponentialHistogram.recordDouble(1.0076); base2ExponentialHistogram.recordDouble(1.008); - currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentValueSnapshot.positive().offset()).isZero(); assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE); assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(1); @@ -206,7 +206,7 @@ void testValuesAtIndices() { // We will record a value that will downscale by 1 and this should merge 2 // consecutive buckets into one. base2ExponentialHistogram.recordDouble(1.012); - currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentValueSnapshot.positive().offset()).isZero(); assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE - 1); assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(2); @@ -216,7 +216,7 @@ void testValuesAtIndices() { // The base will reduced by a factor of more than one, base2ExponentialHistogram.recordDouble(4); - currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentValueSnapshot.positive().offset()).isZero(); assertThat(base2ExponentialHistogram.getScale()).isEqualTo(3); assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(5); @@ -242,7 +242,7 @@ void reset() { base2ExponentialHistogram.recordDouble(1); base2ExponentialHistogram.recordDouble(2); - ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); final int intialScale = currentSnapshot.scale(); assertThat(currentSnapshot.zeroCount()).isEqualTo(1); assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE); @@ -250,7 +250,7 @@ void reset() { assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2); base2ExponentialHistogram.reset(); - currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(); + currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false); assertThat(currentSnapshot.zeroCount()).isZero(); assertThat(currentSnapshot.scale()).isEqualTo(intialScale); assertThat(currentSnapshot.positive().offset()).isZero();