Skip to content

Commit

Permalink
refactor groovymetricenvironment to use generics, and implement some …
Browse files Browse the repository at this point in the history
…PR feedback
  • Loading branch information
Sam DeHaan committed Jul 12, 2023
1 parent f24799b commit 8520d26
Showing 1 changed file with 76 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Logger;
import java.util.function.Supplier;
import javax.annotation.Nullable;

public class GroovyMetricEnvironment {
private static final Logger logger = Logger.getLogger(GroovyMetricEnvironment.class.getName());

private final SdkMeterProvider meterProvider;
private final Meter meter;

Expand Down Expand Up @@ -243,27 +241,11 @@ public ObservableDoubleMeasurement registerDoubleValueCallback(
InstrumentValueType.DOUBLE)
.hashCode();

// Only build the instrument if it isn't already in the registry
ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash);
if (obs == null) {
obs = meter.gaugeBuilder(name).setDescription(description).setUnit(unit).buildObserver();
instrumentOnceRegistry.put(descriptorHash, obs);
// If an updater was not provided, the measurement is expected to be added
// to a group batchcallback using the registerBatchCallback function
if (updater != null) {
Consumer<ObservableDoubleMeasurement> cb = proxiedDoubleObserver(descriptorHash, updater);
ObservableDoubleMeasurement finalObs = (ObservableDoubleMeasurement) obs;
meter.batchCallback(() -> cb.accept(finalObs), obs);
}
} else if (updater != null) {
// If the instrument has already been built with the appropriate proxied observer,
// update the registry so that the callback has the appropriate updater function
AtomicReference<Consumer<ObservableDoubleMeasurement>> existingUpdater =
doubleUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
}

return (ObservableDoubleMeasurement) obs;
return registerCallback(
doubleUpdaterRegistry,
() -> meter.gaugeBuilder(name).setDescription(description).setUnit(unit).buildObserver(),
descriptorHash,
updater);
}

/**
Expand All @@ -285,33 +267,17 @@ public ObservableLongMeasurement registerLongValueCallback(
name, description, unit, InstrumentType.OBSERVABLE_GAUGE, InstrumentValueType.LONG)
.hashCode();

// Only build the instrument if it isn't already in the registry
ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash);
if (obs == null) {
obs =
meter
.gaugeBuilder(name)
.ofLongs()
.setDescription(description)
.setUnit(unit)
.buildObserver();
instrumentOnceRegistry.put(descriptorHash, obs);
// If an updater was not provided, the measurement is expected to be added
// to a group batchcallback using the registerBatchCallback function
if (updater != null) {
Consumer<ObservableLongMeasurement> cb = proxiedLongObserver(descriptorHash, updater);
ObservableLongMeasurement finalObs = (ObservableLongMeasurement) obs;
meter.batchCallback(() -> cb.accept(finalObs), obs);
}
} else if (updater != null) {
// If the instrument has already been built with the appropriate proxied observer,
// update the registry so that the callback has the appropriate updater function
AtomicReference<Consumer<ObservableLongMeasurement>> existingUpdater =
longUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
}

return (ObservableLongMeasurement) obs;
return registerCallback(
longUpdaterRegistry,
() ->
meter
.gaugeBuilder(name)
.ofLongs()
.setDescription(description)
.setUnit(unit)
.buildObserver(),
descriptorHash,
updater);
}

/**
Expand All @@ -337,33 +303,17 @@ public ObservableDoubleMeasurement registerDoubleCounterCallback(
InstrumentValueType.DOUBLE)
.hashCode();

// Only build the instrument if it isn't already in the registry
ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash);
if (obs == null) {
obs =
meter
.counterBuilder(name)
.setDescription(description)
.setUnit(unit)
.ofDoubles()
.buildObserver();
instrumentOnceRegistry.put(descriptorHash, obs);
// If an updater was not provided, the measurement is expected to be added
// to a group batchcallback using the registerBatchCallback function
if (updater != null) {
Consumer<ObservableDoubleMeasurement> cb = proxiedDoubleObserver(descriptorHash, updater);
ObservableDoubleMeasurement finalObs = (ObservableDoubleMeasurement) obs;
meter.batchCallback(() -> cb.accept(finalObs), obs);
}
} else if (updater != null) {
// If the instrument has already been built with the appropriate proxied observer,
// update the registry so that the callback has the appropriate updater function
AtomicReference<Consumer<ObservableDoubleMeasurement>> existingUpdater =
doubleUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
}

return (ObservableDoubleMeasurement) obs;
return registerCallback(
doubleUpdaterRegistry,
() ->
meter
.counterBuilder(name)
.setDescription(description)
.setUnit(unit)
.ofDoubles()
.buildObserver(),
descriptorHash,
updater);
}

/**
Expand All @@ -389,27 +339,11 @@ public ObservableLongMeasurement registerLongCounterCallback(
InstrumentValueType.LONG)
.hashCode();

// Only build the instrument if it isn't already in the registry
ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash);
if (obs == null) {
obs = meter.counterBuilder(name).setDescription(description).setUnit(unit).buildObserver();
instrumentOnceRegistry.put(descriptorHash, obs);
// If an updater was not provided, the measurement is expected to be added
// to a group batchcallback using the registerBatchCallback function
if (updater != null) {
Consumer<ObservableLongMeasurement> cb = proxiedLongObserver(descriptorHash, updater);
ObservableLongMeasurement finalObs = (ObservableLongMeasurement) obs;
meter.batchCallback(() -> cb.accept(finalObs), obs);
}
} else if (updater != null) {
// If the instrument has already been built with the appropriate proxied observer,
// update the registry so that the callback has the appropriate updater function
AtomicReference<Consumer<ObservableLongMeasurement>> existingUpdater =
longUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
}

return (ObservableLongMeasurement) obs;
return registerCallback(
longUpdaterRegistry,
() -> meter.counterBuilder(name).setDescription(description).setUnit(unit).buildObserver(),
descriptorHash,
updater);
}

/**
Expand All @@ -435,33 +369,17 @@ public ObservableDoubleMeasurement registerDoubleUpDownCounterCallback(
InstrumentValueType.DOUBLE)
.hashCode();

// Only build the instrument if it isn't already in the registry
ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash);
if (obs == null) {
obs =
meter
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(unit)
.ofDoubles()
.buildObserver();
instrumentOnceRegistry.put(descriptorHash, obs);
// If an updater was not provided, the measurement is expected to be added
// to a group batchcallback using the registerBatchCallback function
if (updater != null) {
Consumer<ObservableDoubleMeasurement> cb = proxiedDoubleObserver(descriptorHash, updater);
ObservableDoubleMeasurement finalObs = (ObservableDoubleMeasurement) obs;
meter.batchCallback(() -> cb.accept(finalObs), obs);
}
} else if (updater != null) {
// If the instrument has already been built with the appropriate proxied observer,
// update the registry so that the callback has the appropriate updater function
AtomicReference<Consumer<ObservableDoubleMeasurement>> existingUpdater =
doubleUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
}

return (ObservableDoubleMeasurement) obs;
return registerCallback(
doubleUpdaterRegistry,
() ->
meter
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(unit)
.ofDoubles()
.buildObserver(),
descriptorHash,
updater);
}

/**
Expand All @@ -487,32 +405,43 @@ public ObservableLongMeasurement registerLongUpDownCounterCallback(
InstrumentValueType.LONG)
.hashCode();

return registerCallback(
longUpdaterRegistry,
() ->
meter
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(unit)
.buildObserver(),
descriptorHash,
updater);
}

private <T extends ObservableMeasurement> T registerCallback(
final Map<Integer, AtomicReference<Consumer<T>>> registry,
final Supplier<T> observerBuilder,
final int descriptorHash,
final Consumer<T> updater) {

// Only build the instrument if it isn't already in the registry
ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash);
if (obs == null) {
obs =
meter
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(unit)
.buildObserver();
instrumentOnceRegistry.put(descriptorHash, obs);
T observer = observerBuilder.get();
instrumentOnceRegistry.put(descriptorHash, observer);
// If an updater was not provided, the measurement is expected to be added
// to a group batchcallback using the registerBatchCallback function
if (updater != null) {
Consumer<ObservableLongMeasurement> cb = proxiedLongObserver(descriptorHash, updater);
ObservableLongMeasurement finalObs = (ObservableLongMeasurement) obs;
meter.batchCallback(() -> cb.accept(finalObs), obs);
Consumer<T> cb = proxiedObserver(descriptorHash, registry, updater);
meter.batchCallback(() -> cb.accept(observer), observer);
}
return observer;
} else if (updater != null) {
// If the instrument has already been built with the appropriate proxied observer,
// update the registry so that the callback has the appropriate updater function
AtomicReference<Consumer<ObservableLongMeasurement>> existingUpdater =
longUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
registry.get(descriptorHash).set(updater);
}

return (ObservableLongMeasurement) obs;
return (T) obs;
}

/**
Expand Down Expand Up @@ -543,28 +472,12 @@ public void registerBatchCallback(
}
}

private Consumer<ObservableDoubleMeasurement> proxiedDoubleObserver(
final int descriptorHash, final Consumer<ObservableDoubleMeasurement> updater) {
doubleUpdaterRegistry.putIfAbsent(descriptorHash, new AtomicReference<>());
AtomicReference<Consumer<ObservableDoubleMeasurement>> existingUpdater =
doubleUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
return doubleResult -> {
Consumer<ObservableDoubleMeasurement> existing =
doubleUpdaterRegistry.get(descriptorHash).get();
existing.accept(doubleResult);
};
}

private Consumer<ObservableLongMeasurement> proxiedLongObserver(
final int descriptorHash, final Consumer<ObservableLongMeasurement> updater) {
longUpdaterRegistry.putIfAbsent(descriptorHash, new AtomicReference<>());
AtomicReference<Consumer<ObservableLongMeasurement>> existingUpdater =
longUpdaterRegistry.get(descriptorHash);
existingUpdater.set(updater);
return longResult -> {
Consumer<ObservableLongMeasurement> existing = longUpdaterRegistry.get(descriptorHash).get();
existing.accept(longResult);
};
private <T extends ObservableMeasurement> Consumer<T> proxiedObserver(
final int descriptorHash,
final Map<Integer, AtomicReference<Consumer<T>>> registry,
final Consumer<T> updater) {
registry.putIfAbsent(descriptorHash, new AtomicReference<>());
registry.get(descriptorHash).set(updater);
return longResult -> registry.get(descriptorHash).get().accept(longResult);
}
}

0 comments on commit 8520d26

Please sign in to comment.