Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][fc][tc] Lazily register client-side metrics #1104

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
430d0ed
Lazily init record transformer metrics
kvargha Aug 6, 2024
a39aab1
lazily init thin-client metrics
kvargha Aug 6, 2024
3bb215f
Don't lazily init some thin client metrics
kvargha Aug 6, 2024
70343be
Don't lazily init some thin client metrics
kvargha Aug 6, 2024
0e4efb8
Don't lazily init some thin client metrics
kvargha Aug 6, 2024
ee1f1cc
Lazily init fast client metrics
kvargha Aug 6, 2024
6e01889
Don't lazily init some fast client metrics
kvargha Aug 6, 2024
e5a1184
Don't lazily init some fast client metrics
kvargha Aug 6, 2024
cae4dac
Lazily init some dvc metrics
kvargha Aug 7, 2024
b9f02e8
Don't Lazily init some dvc metrics
kvargha Aug 7, 2024
08d6416
Lazily init some dvc metrics
kvargha Aug 7, 2024
37328cb
Don't Lazily init some dvc metrics
kvargha Aug 7, 2024
a642740
Lazily init some dvc metrics
kvargha Aug 7, 2024
237fb50
Go bacl
kvargha Aug 8, 2024
a7746f6
Undo more stuff
kvargha Aug 8, 2024
b83eab8
Lazily init some dvc metrics
kvargha Aug 8, 2024
d17737b
Lazily init some dvc metrics
kvargha Aug 8, 2024
1650f93
Don't Lazily init some dvc metrics
kvargha Aug 8, 2024
9a13271
Don't Lazily init some dvc metrics
kvargha Aug 8, 2024
e744219
Lazily init some dvc metrics
kvargha Aug 8, 2024
b1de07a
Don't lazily init some metrics
kvargha Aug 8, 2024
fc87595
Create lazy wrapper method for thin client
kvargha Aug 8, 2024
8b0540f
Create lazy wrapper method for fast client
kvargha Aug 8, 2024
de408b2
Create lazy wrapper method for dvc
kvargha Aug 8, 2024
99a13da
Don't lazily init some thin client metrics
kvargha Aug 8, 2024
159128b
Don't lazily init some thin client metrics
kvargha Aug 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,6 @@ public StoreIngestionTask(
getRecordTransformer != null ? getRecordTransformer.apply(store.getCurrentVersion()) : null;
this.recordTransformer =
clientRecordTransformer != null ? new BlockingDaVinciRecordTransformer(clientRecordTransformer) : null;
if (this.recordTransformer != null) {
versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleEndLatency(storeName, versionNumber);
versionedIngestionStats.registerTransformerErrorSensor(storeName, versionNumber);
}

this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS);
this.localKafkaServerSingletonSet = Collections.singleton(localKafkaServer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,4 @@ public void recordTransformerError(String storeName, int version, double value,
public void recordMaxIdleTime(String storeName, int version, long idleTimeMs) {
getStats(storeName, version).recordIdleTime(idleTimeMs);
}

public void registerTransformerLatencySensor(String storeName, int version) {
getStats(storeName, version).registerTransformerLatencySensor();
getTotalStats(storeName).registerTransformerLatencySensor();
}

public void registerTransformerLifecycleStartLatency(String storeName, int version) {
getStats(storeName, version).registerTransformerLifecycleStartLatencySensor();
getTotalStats(storeName).registerTransformerLifecycleStartLatencySensor();
}

public void registerTransformerLifecycleEndLatency(String storeName, int version) {
getStats(storeName, version).registerTransformerLifecycleEndLatencySensor();
getTotalStats(storeName).registerTransformerLifecycleEndLatencySensor();
}

public void registerTransformerErrorSensor(String storeName, int version) {
getStats(storeName, version).registerTransformerErrorSensor();
getTotalStats(storeName).registerTransformerErrorSensor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask;
import com.linkedin.venice.stats.LongAdderRateGauge;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricConfig;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
Expand Down Expand Up @@ -93,9 +94,9 @@ public class IngestionStats {
private final WritePathLatencySensor consumedRecordEndToEndProcessingLatencySensor;
private final WritePathLatencySensor nearlineProducerToLocalBrokerLatencySensor;
private final WritePathLatencySensor nearlineLocalBrokerToReadyToServeLatencySensor;
private WritePathLatencySensor transformerLatencySensor;
private WritePathLatencySensor transformerLifecycleStartLatencySensor;
private WritePathLatencySensor transformerLifecycleEndLatencySensor;
private final WritePathLatencySensor transformerLatencySensor;
private final WritePathLatencySensor transformerLifecycleStartLatencySensor;
private final WritePathLatencySensor transformerLifecycleEndLatencySensor;
private final WritePathLatencySensor producerCallBackLatency;
private final WritePathLatencySensor leaderPreprocessingLatency;
private final WritePathLatencySensor internalPreprocessingLatency;
Expand All @@ -109,14 +110,14 @@ public class IngestionStats {

/** Record a version-level offset rewind events for VTs across all stores. */
private final Count versionTopicEndOffsetRewindCount = new Count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this one too?

private final Sensor versionTopicEndOffsetRewindSensor;
private final Lazy<Sensor> versionTopicEndOffsetRewindSensor;
private final MetricsRepository localMetricRepository;

// Measure the max idle time among partitions for a given the store on this host
private final LongAdderRateGauge idleTimeSensor = new LongAdderRateGauge();

private Count transformerErrorCount = new Count();
private Sensor transformerErrorSensor;
private final Count transformerErrorCount = new Count();
private final Lazy<Sensor> transformerErrorSensor;

public IngestionStats(VeniceServerConfig serverConfig) {

Expand Down Expand Up @@ -171,8 +172,11 @@ public IngestionStats(VeniceServerConfig serverConfig) {
registerSensor(localMetricRepository, LEADER_RECORDS_PRODUCED_METRIC_NAME, leaderRecordsProducedSensor);
registerSensor(localMetricRepository, LEADER_BYTES_PRODUCED_METRIC_NAME, leaderBytesProducedSensor);

versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT);
versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount);
versionTopicEndOffsetRewindSensor = Lazy.of(() -> {
Sensor versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT);
versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount);
return versionTopicEndOffsetRewindSensor;
});

producerSourceBrokerLatencySensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, "producer_to_source_broker_latency");
Expand Down Expand Up @@ -207,6 +211,17 @@ public IngestionStats(VeniceServerConfig serverConfig) {
registerSensor(localMetricRepository, OFFSET_REGRESSION_DCR_ERROR, offsetRegressionDCRErrorSensor);
registerSensor(localMetricRepository, TOMBSTONE_CREATION_DCR, tombstoneCreationDCRSensor);
registerSensor(localMetricRepository, IDLE_TIME, idleTimeSensor);

transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here. Why are some of these lazy and some not? When do these get registered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some metrics are required to be registered by some unit/integration tests by asserting that their value is 0. Should I modify the tests to assert that the sensor be null instead?

The underlying sensor inside WritePathLatencySensor is registered as lazy, but when a record is called on it the sensor will be registered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to edit tests if the behavior no longer matches up. I think we should be uniform here unless theres a valid reason to not do so.

transformerLifecycleStartLatencySensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_START_LATENCY);
transformerLifecycleEndLatencySensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_END_LATENCY);
transformerErrorSensor = Lazy.of(() -> {
Sensor transformerErrorSensor = localMetricRepository.sensor(TRANSFORMER_ERROR_COUNT);
transformerErrorSensor.add(TRANSFORMER_ERROR_COUNT, transformerErrorCount);
return transformerErrorSensor;
});
}

private void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) {
Expand Down Expand Up @@ -376,7 +391,7 @@ public void recordInternalPreprocessingLatency(double value, long currentTimeMs)
}

public void recordVersionTopicEndOffsetRewind() {
versionTopicEndOffsetRewindSensor.record();
versionTopicEndOffsetRewindSensor.get().record();
}

public double getVersionTopicEndOffsetRewindCount() {
Expand Down Expand Up @@ -568,55 +583,25 @@ public void recordNearlineLocalBrokerToReadyToServeLatency(double value, long cu
}

public void recordTransformerError(double value, long currentTimeMs) {
transformerErrorSensor.record(value, currentTimeMs);
}

public void registerTransformerErrorSensor() {
if (transformerErrorSensor == null) {
transformerErrorSensor = localMetricRepository.sensor(TRANSFORMER_ERROR_COUNT);
transformerErrorSensor.add(TRANSFORMER_ERROR_COUNT, transformerErrorCount);
}
transformerErrorSensor.get().record(value, currentTimeMs);
}

public double getTransformerErrorCount() {
if (transformerErrorCount != null) {
return transformerErrorCount.measure(METRIC_CONFIG, System.currentTimeMillis());
}
return 0;
return transformerErrorCount.measure(METRIC_CONFIG, System.currentTimeMillis());
}

public void recordTransformerLatency(double value, long currentTimeMs) {
transformerLatencySensor.record(value, currentTimeMs);
}

public void registerTransformerLatencySensor() {
if (transformerLatencySensor == null) {
transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY);
}
}

public void recordTransformerLifecycleStartLatency(double value, long currentTimeMs) {
transformerLifecycleStartLatencySensor.record(value, currentTimeMs);
}

public void registerTransformerLifecycleStartLatencySensor() {
if (transformerLifecycleStartLatencySensor == null) {
transformerLifecycleStartLatencySensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_START_LATENCY);
}
}

public void recordTransformerLifecycleEndLatency(double value, long currentTimeMs) {
transformerLifecycleEndLatencySensor.record(value, currentTimeMs);
}

public void registerTransformerLifecycleEndLatencySensor() {
if (transformerLifecycleEndLatencySensor == null) {
transformerLifecycleEndLatencySensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_END_LATENCY);
}
}

public void recordIdleTime(long value) {
idleTimeSensor.record(value);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.davinci.stats;

import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.Gauge;
Expand All @@ -10,17 +11,17 @@
public class IsolatedIngestionProcessHeartbeatStats extends AbstractVeniceStats {
private static final String METRICS_PREFIX = "ingestion_isolation_heartbeat";
// Delay in millis since last successful heartbeat query.
private final Sensor heartbeatAgeSensor;
private final Lazy<Sensor> heartbeatAgeSensor;
private final Sensor forkedProcessRestartSensor;

public IsolatedIngestionProcessHeartbeatStats(MetricsRepository metricsRepository) {
super(metricsRepository, METRICS_PREFIX);
heartbeatAgeSensor = registerSensor("heartbeat_age", new Gauge());
heartbeatAgeSensor = registerLazySensor("heartbeat_age", new Gauge());
forkedProcessRestartSensor = registerSensor("forked_process_restart", new OccurrenceRate());
}

public void recordHeartbeatAge(long heartbeatAgeInMs) {
heartbeatAgeSensor.record(heartbeatAgeInMs);
heartbeatAgeSensor.get().record(heartbeatAgeInMs);
}

public void recordForkedProcessRestart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.stats.LongAdderRateGauge;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.AsyncGauge;
Expand All @@ -24,25 +25,25 @@

public class KafkaConsumerServiceStats extends AbstractVeniceStats {
private final LongAdderRateGauge pollRequestSensor;
private final Sensor pollRequestLatencySensor;
private final Lazy<Sensor> pollRequestLatencySensor;
private final Sensor pollResultNumSensor;
private final LongAdderRateGauge pollNonZeroResultNumSensor;

private final Sensor pollRequestError;
private final Sensor consumerRecordsProducingToWriterBufferLatencySensor;
private final Sensor detectedDeletedTopicNumSensor;
private final Sensor detectedNoRunningIngestionTopicPartitionNumSensor;
private final Sensor delegateSubscribeLatencySensor;
private final Sensor updateCurrentAssignmentLatencySensor;
private final Sensor maxPartitionsPerConsumer;
private final Sensor minPartitionsPerConsumer;
private final Sensor avgPartitionsPerConsumer;
private final Lazy<Sensor> consumerRecordsProducingToWriterBufferLatencySensor;
private final Lazy<Sensor> detectedDeletedTopicNumSensor;
private final Lazy<Sensor> detectedNoRunningIngestionTopicPartitionNumSensor;
private final Lazy<Sensor> delegateSubscribeLatencySensor;
private final Lazy<Sensor> updateCurrentAssignmentLatencySensor;
private final Lazy<Sensor> maxPartitionsPerConsumer;
private final Lazy<Sensor> minPartitionsPerConsumer;
private final Lazy<Sensor> avgPartitionsPerConsumer;
private final Sensor getOffsetLagIsAbsentSensor;
private final Sensor getOffsetLagIsPresentSensor;
private final Sensor getLatestOffsetIsAbsentSensor;
private final Sensor getLatestOffsetIsPresentSensor;
private final Sensor byteSizeSensor;
private final Sensor idleTimeSensor;
private final Lazy<Sensor> idleTimeSensor;

public KafkaConsumerServiceStats(
MetricsRepository metricsRepository,
Expand Down Expand Up @@ -71,12 +72,12 @@ public KafkaConsumerServiceStats(
*/

// the consumer idle time
idleTimeSensor = registerSensor("idle_time", new Max());
idleTimeSensor = registerLazySensor("idle_time", new Max());
// the number of poll requests
pollRequestSensor =
registerOnlyTotalRate("consumer_poll_request", totalStats, () -> totalStats.pollRequestSensor, time);
// Notice that "pollRequestLatencySensor" only reports correct data when consumer task threads are not stuck
pollRequestLatencySensor = registerSensor("consumer_poll_request_latency", new Avg(), new Max());
pollRequestLatencySensor = registerLazySensor("consumer_poll_request_latency", new Avg(), new Max());
pollNonZeroResultNumSensor = registerOnlyTotalRate(
"consumer_poll_non_zero_result_num",
totalStats,
Expand All @@ -96,16 +97,17 @@ public KafkaConsumerServiceStats(
pollRequestError = registerSensor("consumer_poll_error", new OccurrenceRate());
// To measure 'put' latency of consumer records blocking queue
consumerRecordsProducingToWriterBufferLatencySensor =
registerSensor("consumer_records_producing_to_write_buffer_latency", new Avg(), new Max());
detectedDeletedTopicNumSensor = registerSensor("detected_deleted_topic_num", new Total());
registerLazySensor("consumer_records_producing_to_write_buffer_latency", new Avg(), new Max());
detectedDeletedTopicNumSensor = registerLazySensor("detected_deleted_topic_num", new Total());
detectedNoRunningIngestionTopicPartitionNumSensor =
registerSensor("detected_no_running_ingestion_topic_partition_num", new Total());
delegateSubscribeLatencySensor = registerSensor("delegate_subscribe_latency", new Avg(), new Max());
updateCurrentAssignmentLatencySensor = registerSensor("update_current_assignment_latency", new Avg(), new Max());
registerLazySensor("detected_no_running_ingestion_topic_partition_num", new Total());
delegateSubscribeLatencySensor = registerLazySensor("delegate_subscribe_latency", new Avg(), new Max());
updateCurrentAssignmentLatencySensor =
registerLazySensor("update_current_assignment_latency", new Avg(), new Max());

minPartitionsPerConsumer = registerSensor("min_partitions_per_consumer", new Gauge());
maxPartitionsPerConsumer = registerSensor("max_partitions_per_consumer", new Gauge());
avgPartitionsPerConsumer = registerSensor("avg_partitions_per_consumer", new Gauge());
minPartitionsPerConsumer = registerLazySensor("min_partitions_per_consumer", new Gauge());
maxPartitionsPerConsumer = registerLazySensor("max_partitions_per_consumer", new Gauge());
avgPartitionsPerConsumer = registerLazySensor("avg_partitions_per_consumer", new Gauge());

Sensor getOffsetLagSensor = registerSensor("getOffsetLag", new OccurrenceRate());
Sensor[] offsetLagParent = new Sensor[] { getOffsetLagSensor };
Expand All @@ -122,7 +124,7 @@ public KafkaConsumerServiceStats(

public void recordPollRequestLatency(double latency) {
pollRequestSensor.record();
pollRequestLatencySensor.record(latency);
pollRequestLatencySensor.get().record(latency);
}

public void recordPollResultNum(int count) {
Expand All @@ -134,39 +136,39 @@ public void recordNonZeroPollResultNum(int count) {
}

public void recordConsumerRecordsProducingToWriterBufferLatency(double latency) {
consumerRecordsProducingToWriterBufferLatencySensor.record(latency);
consumerRecordsProducingToWriterBufferLatencySensor.get().record(latency);
}

public void recordPollError() {
pollRequestError.record();
}

public void recordDetectedDeletedTopicNum(int count) {
detectedDeletedTopicNumSensor.record(count);
detectedDeletedTopicNumSensor.get().record(count);
}

public void recordDetectedNoRunningIngestionTopicPartitionNum(int count) {
detectedNoRunningIngestionTopicPartitionNumSensor.record(count);
detectedNoRunningIngestionTopicPartitionNumSensor.get().record(count);
}

public void recordDelegateSubscribeLatency(double value) {
delegateSubscribeLatencySensor.record(value);
delegateSubscribeLatencySensor.get().record(value);
}

public void recordUpdateCurrentAssignmentLatency(double value) {
updateCurrentAssignmentLatencySensor.record(value);
updateCurrentAssignmentLatencySensor.get().record(value);
}

public void recordMinPartitionsPerConsumer(int count) {
minPartitionsPerConsumer.record(count);
minPartitionsPerConsumer.get().record(count);
}

public void recordMaxPartitionsPerConsumer(int count) {
maxPartitionsPerConsumer.record(count);
maxPartitionsPerConsumer.get().record(count);
}

public void recordAvgPartitionsPerConsumer(int count) {
avgPartitionsPerConsumer.record(count);
avgPartitionsPerConsumer.get().record(count);
}

public void recordOffsetLagIsAbsent() {
Expand All @@ -190,6 +192,6 @@ public void recordByteSizePerPoll(double count) {
}

public void recordConsumerIdleTime(double time) {
idleTimeSensor.record(time);
idleTimeSensor.get().record(time);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.Gauge;
Expand All @@ -14,23 +15,23 @@ public class MetadataUpdateStats extends AbstractVeniceStats {
private static final String METRICS_PREFIX = "ingestion_isolation_metadata_updates";

// Number of remaining elements inside metadata update queue.
private final Sensor metadataUpdateQueueLengthSensor;
private final Lazy<Sensor> metadataUpdateQueueLengthSensor;
// If we encountered unknown exception during metadata update, we will set the Gauge value to 1
private final Sensor metadataUpdateQueueErrorSensor;
private final Lazy<Sensor> metadataUpdateQueueErrorSensor;

public MetadataUpdateStats(MetricsRepository metricsRepository) {
super(metricsRepository, METRICS_PREFIX);
metadataUpdateQueueLengthSensor = registerSensor("queue_length", new Gauge());
metadataUpdateQueueErrorSensor = registerSensor("queue_update_error", new Gauge());
metadataUpdateQueueLengthSensor = registerLazySensor("queue_length", new Gauge());
metadataUpdateQueueErrorSensor = registerLazySensor("queue_update_error", new Gauge());
// Reset metadata update queue error Gauge.
recordMetadataQueueUpdateError(0.0);
}

public void recordMetadataUpdateQueueLength(int queueLength) {
metadataUpdateQueueLengthSensor.record(queueLength);
metadataUpdateQueueLengthSensor.get().record(queueLength);
}

public void recordMetadataQueueUpdateError(double value) {
metadataUpdateQueueErrorSensor.record(value);
metadataUpdateQueueErrorSensor.get().record(value);
}
}
Loading
Loading