Skip to content

Commit

Permalink
Fix sliding window rotation #894 (#896)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Stäber <fabian@fstab.de>
  • Loading branch information
fstab authored Nov 9, 2023
1 parent a192c78 commit bc2244d
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.lang.reflect.Array;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier;

Expand All @@ -22,6 +23,7 @@ public class SlidingWindow<T> {
private int currentBucket;
private long lastRotateTimestampMillis;
private final long durationBetweenRotatesMillis;
LongSupplier currentTimeMillis = System::currentTimeMillis; // to be replaced in unit tests

/**
* Example: If the {@code maxAgeSeconds} is 60 and {@code ageBuckets} is 3, then 3 instances of {@code T}
Expand Down Expand Up @@ -56,11 +58,14 @@ public synchronized T current() {
* Observe a value.
*/
public synchronized void observe(double value) {
observeFunction.accept(rotate(), value);
rotate();
for (T t : ringBuffer) {
observeFunction.accept(t, value);
}
}

private T rotate() {
long timeSinceLastRotateMillis = System.currentTimeMillis() - lastRotateTimestampMillis;
long timeSinceLastRotateMillis = currentTimeMillis.getAsLong() - lastRotateTimestampMillis;
while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) {
ringBuffer[currentBucket] = constructor.get();
if (++currentBucket >= ringBuffer.length) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.prometheus.metrics.core.metrics;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowTest {

static class Observer {

List<Double> values = new ArrayList<>();

public void observe(double value) {
values.add(value);
}

void assertValues(double... expectedValues) {
ArrayList<Double> expectedList = new ArrayList<>();
for (double expectedValue : expectedValues) {
expectedList.add(expectedValue);
}
Assert.assertEquals(expectedList, values);
}
}

private final AtomicLong currentTimeMillis = new AtomicLong();
private SlidingWindow<Observer> ringBuffer;
private final long maxAgeSeconds = 30;
private final int ageBuckets = 5;
private final long timeBetweenRotateMillis = maxAgeSeconds * 1000 / ageBuckets + 2;

@Before
public void setUp() {
currentTimeMillis.set(System.currentTimeMillis());
ringBuffer = new SlidingWindow<>(Observer.class, Observer::new, Observer::observe, maxAgeSeconds, ageBuckets);
ringBuffer.currentTimeMillis = currentTimeMillis::get;
}

@Test
public void testRotate() {
for (int i=0; i<ageBuckets; i++) {
currentTimeMillis.addAndGet(timeBetweenRotateMillis);
ringBuffer.observe(1.0);
}
ringBuffer.current().assertValues(1.0, 1.0, 1.0, 1.0, 1.0);
currentTimeMillis.addAndGet(timeBetweenRotateMillis);
ringBuffer.current().assertValues(1.0, 1.0, 1.0, 1.0);
currentTimeMillis.addAndGet(timeBetweenRotateMillis);
ringBuffer.current().assertValues(1.0, 1.0, 1.0);
currentTimeMillis.addAndGet(timeBetweenRotateMillis);
ringBuffer.current().assertValues(1.0, 1.0);
currentTimeMillis.addAndGet(timeBetweenRotateMillis);
ringBuffer.current().assertValues(1.0);
currentTimeMillis.addAndGet(timeBetweenRotateMillis);
ringBuffer.current().assertValues();
}

@Test
public void testMultiRotate() {
ringBuffer.observe(1.0);
currentTimeMillis.addAndGet(2 * timeBetweenRotateMillis); // 2/5 of max aqe
ringBuffer.observe(2.0);
ringBuffer.current().assertValues(1.0, 2.0);
currentTimeMillis.addAndGet(3 * timeBetweenRotateMillis); // 5/5 of max age -> first observation evicted
ringBuffer.current().assertValues(2.0);
ringBuffer.observe(3.0);
ringBuffer.current().assertValues(2.0, 3.0);
currentTimeMillis.addAndGet(2 * timeBetweenRotateMillis); // 7/5 of max age
ringBuffer.current().assertValues(3.0);
currentTimeMillis.addAndGet(3 * timeBetweenRotateMillis); // 10/5 of max age
ringBuffer.current().assertValues(); // empty
}
}

0 comments on commit bc2244d

Please sign in to comment.