Skip to content

Commit

Permalink
Add concurrency to exponential histogram
Browse files Browse the repository at this point in the history
- Rescaling is not frequent behaviour (for cumulative scale should normalize after initial few recording, for delta it could increase/decrease but would be mostly stable). Instead of opting for a full synchronization on writes (which would have less performance), the choice of ReadWriteLock is chosen.
  • Loading branch information
lenin-jaganathan committed Dec 20, 2024
1 parent 9018b50 commit 6fca20c
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import io.micrometer.common.lang.Nullable;
Expand Down Expand Up @@ -49,6 +50,21 @@
*/
public abstract class Base2ExponentialHistogram implements Histogram {

/*
* This is an alternative to full-blown synchronization on the Histogram which would
* severely affect the performance under high concurrent cases.
*
* Read lock guards adding data into histogram i,e multiple values can be recorded at
* the same time for a given scale. If scale needs to be changed, write-lock need to
* be acquired and all the operations are blocked until new scale is determined and
* updated with.
*/
private final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

private final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();

private final ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

private final int maxScale;

private final int maxBucketsCount;
Expand Down Expand Up @@ -131,13 +147,33 @@ public HistogramSnapshot takeSnapshot(final long count, final double total, fina
}

/**
* Returns the snapshot of current recorded values.
* Returns the snapshot of current recorded values. This method is thread-safe and
* block concurrent recordings to the underlying histogram.
*/
ExponentialHistogramSnapShot getCurrentValuesSnapshot() {
return (circularCountHolder.isEmpty() && zeroCount.longValue() == 0)
? DefaultExponentialHistogramSnapShot.getEmptySnapshotForScale(scale)
: new DefaultExponentialHistogramSnapShot(scale, zeroCount.longValue(), zeroThreshold,
new ExponentialBuckets(getOffset(), getBucketCounts()), EMPTY_EXPONENTIAL_BUCKET);
ExponentialHistogramSnapShot getCurrentValuesSnapshot(final boolean shouldResetHistogram) {
writeLock.lock();
try {
ExponentialHistogramSnapShot exponentialHistogramSnapShot = getExponentialHistogramSnapShot();
if (shouldResetHistogram)
reset();
return exponentialHistogramSnapShot;
}
finally {
writeLock.unlock();
}
}

private ExponentialHistogramSnapShot getExponentialHistogramSnapShot() {
ExponentialHistogramSnapShot exponentialHistogramSnapShot;
if (circularCountHolder.isEmpty() && zeroCount.longValue() == 0) {
// Will only be possible for a delta variant
exponentialHistogramSnapShot = DefaultExponentialHistogramSnapShot.getEmptySnapshotForScale(scale);
}
else {
exponentialHistogramSnapShot = new DefaultExponentialHistogramSnapShot(scale, zeroCount.longValue(),
zeroThreshold, new ExponentialBuckets(getOffset(), getBucketCounts()), EMPTY_EXPONENTIAL_BUCKET);
}
return exponentialHistogramSnapShot;
}

/**
Expand Down Expand Up @@ -168,24 +204,46 @@ public void recordDouble(double value) {
return;
}

int index = base2IndexProvider.getIndexForValue(value);
if (!circularCountHolder.increment(index, 1)) {
synchronized (this) {
int downScaleFactor = getDownScaleFactor(index);
recordInternal(value);
}

private void recordInternal(double value) {
boolean hasRecorded;
readLock.lock();
try {
hasRecorded = incrementValue(value);
}
finally {
readLock.unlock();
}

if (!hasRecorded) {
// If recording fails, then we MIGHT have to re-scale. Try re-scaling.
writeLock.lock();
try {
final int downScaleFactor = getDownScaleFactor(base2IndexProvider.getIndexForValue(value));
downScale(downScaleFactor);
index = base2IndexProvider.getIndexForValue(value);
circularCountHolder.increment(index, 1);
// Record the value within the write lock to guarantee write.
incrementValue(value);
}
finally {
writeLock.unlock();
}
}
}

private boolean incrementValue(final double value) {
return circularCountHolder.increment(base2IndexProvider.getIndexForValue(value), 1);
}

/**
* Reduces the scale of the histogram by downScaleFactor. The buckets are merged to
* align with the exponential scale.
* @param downScaleFactor - the factor to downscale this histogram.
*/
private void downScale(int downScaleFactor) {
if (downScaleFactor == 0) {
// Should never happen.
return;
}

Expand Down Expand Up @@ -286,12 +344,12 @@ private List<Long> getBucketCounts() {
* Reset the current values and possibly increase the scale based on current recorded
* values;
*/
synchronized void reset() {
// VisibleForTesting
void reset() {
int upscaleFactor = getUpscaleFactor();
if (upscaleFactor > 0) {
this.updateScale(this.scale + upscaleFactor);
}

this.circularCountHolder.reset();
this.zeroCount.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,19 @@ else if (index < startIndex) {
startIndex = index;
}

counts.addAndGet(getRelativeIndex(index), incrementBy);
return true;
final int relativeIndex = getRelativeIndex(index);
if (relativeIndex >= 0 && relativeIndex < length) {
counts.addAndGet(relativeIndex, incrementBy);
return true;
}
/*
* This should not happen if the writes are fully exclusive but that would mean a
* poor performance. This case only possible (rarely) during initial writes on th
* histogram where there might be concurrent threads modifying the index and if it
* not fully synchronized externally. In this case, we would notify the caller
* that write has failed and let the caller take a call on what to do.
*/
return false;
}

private int getRelativeIndex(int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public ExponentialHistogramSnapShot getLatestExponentialHistogramSnapshot() {
}

@Override
synchronized void takeExponentialHistogramSnapShot() {
this.exponentialHistogramSnapShot = getCurrentValuesSnapshot();
void takeExponentialHistogramSnapShot() {
this.exponentialHistogramSnapShot = getCurrentValuesSnapshot(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ExponentialHistogramSnapShot getLatestExponentialHistogramSnapshot() {
}

@Override
synchronized void takeExponentialHistogramSnapShot() {
void takeExponentialHistogramSnapShot() {
stepExponentialHistogramSnapShot.poll();
}

Expand All @@ -82,12 +82,8 @@ public StepExponentialHistogramSnapShot(final Clock clock, final long stepMillis
}

@Override
protected synchronized Supplier<ExponentialHistogramSnapShot> valueSupplier() {
return () -> {
ExponentialHistogramSnapShot latestSnapShot = getCurrentValuesSnapshot();
reset();
return latestSnapShot;
};
protected Supplier<ExponentialHistogramSnapShot> valueSupplier() {
return () -> getCurrentValuesSnapshot(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ void testRecordDouble() {
// 1 Always belongs to index 0.
base2ExponentialHistogram.recordDouble(1.000000000001);
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE);
assertThat(base2ExponentialHistogram.getCurrentValuesSnapshot().zeroCount()).isZero();
assertThat(getAllBucketsCountSum(base2ExponentialHistogram.getCurrentValuesSnapshot())).isEqualTo(1);
assertThat(base2ExponentialHistogram.getCurrentValuesSnapshot(false).zeroCount()).isZero();
assertThat(getAllBucketsCountSum(base2ExponentialHistogram.getCurrentValuesSnapshot(false))).isEqualTo(1);
}

@Test
Expand All @@ -66,7 +66,7 @@ void testRecordTimeBased() {
// calling
// recordDouble(2).

ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentSnapshot.zeroCount()).isZero();
assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE);
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2);
Expand All @@ -84,13 +84,13 @@ void testRecordTimeBasedInSeconds() {
// This should be same as calling recordDouble(0.05).
base2ExponentialHistogram.recordLong(Duration.ofMillis(50).toNanos());

ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentSnapshot.zeroCount()).isZero();
assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE);
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2);

base2ExponentialHistogram.recordLong(Duration.ofMillis(90).toNanos());
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentSnapshot.scale()).isEqualTo(1);
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(3);
}
Expand All @@ -101,7 +101,7 @@ void testZeroThreshHold() {
base2ExponentialHistogram.recordDouble(0.0);
base2ExponentialHistogram.recordDouble(0.5);

ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentSnapshot.zeroThreshold()).isLessThan(1).isGreaterThan(0);
assertThat(currentSnapshot.zeroCount()).isEqualTo(2);
assertThat(currentSnapshot.scale()).isEqualTo(MAX_SCALE);
Expand All @@ -113,7 +113,7 @@ void testZeroThreshHold() {
base2ExponentialHistogramWithZeroAsMin.recordDouble(Math.nextUp(0.0));

ExponentialHistogramSnapShot snapshotWithZeroAsMin = base2ExponentialHistogramWithZeroAsMin
.getCurrentValuesSnapshot();
.getCurrentValuesSnapshot(false);
assertThat(snapshotWithZeroAsMin.zeroThreshold()).isEqualTo(0.0);
assertThat(snapshotWithZeroAsMin.zeroCount()).isEqualTo(1);
assertThat(snapshotWithZeroAsMin.scale()).isEqualTo(MAX_SCALE);
Expand All @@ -124,7 +124,7 @@ void testZeroThreshHold() {
void testDownScale() {
base2ExponentialHistogram.recordDouble(1.0001);

ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentSnapshot.zeroCount()).isZero();
assertThat(currentSnapshot.scale()).isEqualTo(MAX_SCALE);
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(1);
Expand Down Expand Up @@ -182,11 +182,11 @@ void testUpscale() {

@Test
void testValuesAtIndices() {
ExponentialHistogramSnapShot currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
ExponentialHistogramSnapShot currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentValueSnapshot.positive().bucketCounts()).isEmpty();

base2ExponentialHistogram.recordDouble(1.0001);
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentValueSnapshot.positive().offset()).isZero();
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(1);
assertThat(currentValueSnapshot.positive().bucketCounts()).filteredOn(value -> value == 0).isEmpty();
Expand All @@ -195,7 +195,7 @@ void testValuesAtIndices() {

base2ExponentialHistogram.recordDouble(1.0076);
base2ExponentialHistogram.recordDouble(1.008);
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentValueSnapshot.positive().offset()).isZero();
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE);
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(1);
Expand All @@ -206,7 +206,7 @@ void testValuesAtIndices() {
// We will record a value that will downscale by 1 and this should merge 2
// consecutive buckets into one.
base2ExponentialHistogram.recordDouble(1.012);
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentValueSnapshot.positive().offset()).isZero();
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE - 1);
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(2);
Expand All @@ -216,7 +216,7 @@ void testValuesAtIndices() {

// The base will reduced by a factor of more than one,
base2ExponentialHistogram.recordDouble(4);
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentValueSnapshot.positive().offset()).isZero();
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(3);
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(5);
Expand All @@ -242,15 +242,15 @@ void reset() {
base2ExponentialHistogram.recordDouble(1);
base2ExponentialHistogram.recordDouble(2);

ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
final int intialScale = currentSnapshot.scale();
assertThat(currentSnapshot.zeroCount()).isEqualTo(1);
assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE);
assertThat(currentSnapshot.positive().offset()).isNotZero();
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2);

base2ExponentialHistogram.reset();
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
assertThat(currentSnapshot.zeroCount()).isZero();
assertThat(currentSnapshot.scale()).isEqualTo(intialScale);
assertThat(currentSnapshot.positive().offset()).isZero();
Expand Down

0 comments on commit 6fca20c

Please sign in to comment.