Skip to content

Commit

Permalink
Merge branch 'main' into separation_of_rt_and_vt_div
Browse files Browse the repository at this point in the history
  • Loading branch information
lluwm committed Sep 27, 2024
2 parents b520f03 + c694d96 commit 3c1b171
Show file tree
Hide file tree
Showing 34 changed files with 565 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ public ActiveActiveStoreIngestionTask(
null,
this::processActiveActiveMessage,
isWriteComputationEnabled,
isActiveActiveReplicationEnabled());
isActiveActiveReplicationEnabled(),
aggVersionedIngestionStats,
getHostLevelIngestionStats());
});
}

Expand Down Expand Up @@ -608,7 +610,6 @@ protected void processMessageAndMaybeProduceToKafka(
}
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord = consumerRecordWrapper.getMessage();
KafkaKey kafkaKey = consumerRecord.getKey();
// KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
final MergeConflictResultWrapper mergeConflictResultWrapper;
if (consumerRecordWrapper.getProcessedResult() != null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.utils.LatencyUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -35,25 +39,36 @@ PubSubMessageProcessedResult apply(
}

private final String storeVersionName;
private final String storeName;
private final int version;
private final ExecutorService batchProcessingThreadPool;
private final KeyLevelLocksManager lockManager;
private final boolean isWriteComputationEnabled;
private final boolean isActiveActiveReplicationEnabled;
private final ProcessingFunction processingFunction;
private final AggVersionedIngestionStats aggVersionedIngestionStats;
private final HostLevelIngestionStats hostLevelIngestionStats;

public IngestionBatchProcessor(
String storeVersionName,
ExecutorService batchProcessingThreadPool,
KeyLevelLocksManager lockManager,
ProcessingFunction processingFunction,
boolean isWriteComputationEnabled,
boolean isActiveActiveReplicationEnabled) {
boolean isActiveActiveReplicationEnabled,
AggVersionedIngestionStats aggVersionedIngestionStats,
HostLevelIngestionStats hostLevelIngestionStats) {
this.storeVersionName = storeVersionName;
this.batchProcessingThreadPool = batchProcessingThreadPool;
this.lockManager = lockManager;
this.processingFunction = processingFunction;
this.isWriteComputationEnabled = isWriteComputationEnabled;
this.isActiveActiveReplicationEnabled = isActiveActiveReplicationEnabled;
this.aggVersionedIngestionStats = aggVersionedIngestionStats;
this.hostLevelIngestionStats = hostLevelIngestionStats;

this.storeName = Version.parseStoreFromKafkaTopicName(storeVersionName);
this.version = Version.parseVersionFromKafkaTopicName(storeVersionName);
}

/**
Expand Down Expand Up @@ -104,6 +119,7 @@ public List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs) {
long currentTimestampInNs = System.nanoTime();
if (records.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -123,18 +139,26 @@ public List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope,
if (!isAllMessagesFromRTTopic.get()) {
return resultList;
}

/**
* We would like to process the messages belonging to the same key sequentially to avoid race conditions.
*/
int totalNumOfRecords = 0;
Map<ByteArrayKey, List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>>> keyGroupMap =
new HashMap<>(records.size());
resultList.forEach(r -> {

for (PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> r: resultList) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message = r.getMessage();
if (!message.getKey().isControlMessage()) {
ByteArrayKey byteArrayKey = ByteArrayKey.wrap(message.getKey().getKey());
keyGroupMap.computeIfAbsent(byteArrayKey, (ignored) -> new ArrayList<>()).add(r);
totalNumOfRecords++;
}
});
}
aggVersionedIngestionStats
.recordBatchProcessingRequest(storeName, version, totalNumOfRecords, System.currentTimeMillis());
hostLevelIngestionStats.recordBatchProcessingRequest(totalNumOfRecords);

List<CompletableFuture<Void>> futureList = new ArrayList<>(keyGroupMap.size());
keyGroupMap.forEach((ignored, recordsWithTheSameKey) -> {
futureList.add(CompletableFuture.runAsync(() -> {
Expand All @@ -153,7 +177,13 @@ public List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope,
});
try {
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).get();
double requestLatency = LatencyUtils.getElapsedTimeFromNSToMS(currentTimestampInNs);
aggVersionedIngestionStats
.recordBatchProcessingLatency(storeName, version, requestLatency, System.currentTimeMillis());
hostLevelIngestionStats.recordBatchProcessingRequestLatency(requestLatency);
} catch (Exception e) {
aggVersionedIngestionStats.recordBatchProcessingRequestError(storeName, version);
hostLevelIngestionStats.recordBatchProcessingRequestError();
throw new VeniceException(
"Failed to execute the batch processing for " + storeVersionName + " partition: "
+ partitionConsumptionState.getPartition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ public LeaderFollowerStoreIngestionTask(
null,
this::processMessage,
isWriteComputationEnabled,
isActiveActiveReplicationEnabled());
isActiveActiveReplicationEnabled(),
builder.getVersionedStorageIngestionStats(),
getHostLevelIngestionStats());
});
}

Expand Down Expand Up @@ -1607,7 +1609,10 @@ protected void produceToLocalKafka(
long sourceTopicOffset = consumerRecord.getOffset();
LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(sourceTopicOffset, kafkaClusterId);
partitionConsumptionState.setLastLeaderPersistFuture(leaderProducedRecordContext.getPersistedToDBFuture());
long beforeProduceTimestampNS = System.nanoTime();
produceFunction.accept(callback, leaderMetadataWrapper);
getHostLevelIngestionStats()
.recordLeaderProduceLatency(LatencyUtils.getElapsedTimeFromNSToMS(beforeProduceTimestampNS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.SparseConcurrentList;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Timer;
Expand Down Expand Up @@ -2238,18 +2237,7 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop
if (offsetFromConsumer >= 0) {
return offsetFromConsumer;
}
try {
return RetryUtils.executeWithMaxAttempt(() -> {
long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition);
if (offset == -1) {
throw new VeniceException("Found latest offset -1");
}
return offset;
}, 5, Duration.ofSeconds(1), Collections.singletonList(VeniceException.class));
} catch (Exception e) {
LOGGER.error("Could not find latest offset for {} even after 5 retries", pubSubTopic.getName());
return -1;
}
return getTopicManager(kafkaUrl).getLatestOffsetCached(pubSubTopic, partition);
}

protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,19 @@ public void registerTransformerErrorSensor(String storeName, int version) {
getStats(storeName, version).registerTransformerErrorSensor();
getTotalStats(storeName).registerTransformerErrorSensor();
}

public void recordBatchProcessingRequest(String storeName, int version, int size, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordBatchProcessingRequest(size, timestamp));
}

public void recordBatchProcessingRequestError(String storeName, int version) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordBatchProcessingRequestError());
}

public void recordBatchProcessingLatency(String storeName, int version, double latency, long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordBatchProcessingRequestLatency(latency, timestamp));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package com.linkedin.davinci.stats;

import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST;
import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_ERROR;
import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_LATENCY;
import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_RECORDS;
import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_SIZE;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask;
Expand Down Expand Up @@ -138,6 +144,13 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
*/
private final LongAdderRateGauge totalTombstoneCreationDCRRate;

private final Sensor leaderProduceLatencySensor;
private final LongAdderRateGauge batchProcessingRequestSensor;
private final Sensor batchProcessingRequestSizeSensor;
private final LongAdderRateGauge batchProcessingRequestRecordsSensor;
private final Sensor batchProcessingRequestLatencySensor;
private final LongAdderRateGauge batchProcessingRequestErrorSensor;

/**
* @param totalStats the total stats singleton instance, or null if we are constructing the total stats
*/
Expand Down Expand Up @@ -435,6 +448,37 @@ public HostLevelIngestionStats(
totalStats,
() -> totalStats.leaderIngestionActiveActiveDeleteLatencySensor,
avgAndMax());

this.leaderProduceLatencySensor = registerPerStoreAndTotalSensor(
"leader_produce_latency",
totalStats,
() -> totalStats.leaderProduceLatencySensor,
avgAndMax());
this.batchProcessingRequestSensor = registerOnlyTotalRate(
BATCH_PROCESSING_REQUEST,
totalStats,
() -> totalStats.batchProcessingRequestSensor,
time);
this.batchProcessingRequestErrorSensor = registerOnlyTotalRate(
BATCH_PROCESSING_REQUEST_ERROR,
totalStats,
() -> totalStats.batchProcessingRequestErrorSensor,
time);
this.batchProcessingRequestRecordsSensor = registerOnlyTotalRate(
BATCH_PROCESSING_REQUEST_RECORDS,
totalStats,
() -> totalStats.batchProcessingRequestRecordsSensor,
time);
this.batchProcessingRequestSizeSensor = registerOnlyTotalSensor(
BATCH_PROCESSING_REQUEST_SIZE,
totalStats,
() -> totalStats.batchProcessingRequestSizeSensor,
avgAndMax());
this.batchProcessingRequestLatencySensor = registerOnlyTotalSensor(
BATCH_PROCESSING_REQUEST_LATENCY,
totalStats,
() -> totalStats.batchProcessingRequestLatencySensor,
avgAndMax());
}

/** Record a host-level byte consumption rate across all store versions */
Expand Down Expand Up @@ -614,4 +658,22 @@ public void recordTimestampRegressionDCRError() {
public void recordOffsetRegressionDCRError() {
totalOffsetRegressionDCRErrorRate.record();
}

public void recordLeaderProduceLatency(double latency) {
leaderProduceLatencySensor.record(latency);
}

public void recordBatchProcessingRequest(int size) {
batchProcessingRequestSensor.record();
batchProcessingRequestRecordsSensor.record(size);
batchProcessingRequestSizeSensor.record(size);
}

public void recordBatchProcessingRequestError() {
batchProcessingRequestErrorSensor.record();
}

public void recordBatchProcessingRequestLatency(double latency) {
batchProcessingRequestLatencySensor.record(latency);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public class IngestionStats {
public static final String PRODUCER_CALLBACK_LATENCY = "producer_callback_latency";
public static final String LEADER_PREPROCESSING_LATENCY = "leader_preprocessing_latency";
public static final String INTERNAL_PREPROCESSING_LATENCY = "internal_preprocessing_latency";
public static final String BATCH_PROCESSING_REQUEST = "batch_processing_request";
public static final String BATCH_PROCESSING_REQUEST_SIZE = "batch_processing_request_size";
public static final String BATCH_PROCESSING_REQUEST_RECORDS = "batch_processing_request_records";
public static final String BATCH_PROCESSING_REQUEST_LATENCY = "batch_processing_request_latency";
public static final String BATCH_PROCESSING_REQUEST_ERROR = "batch_processing_request_error";

private static final MetricConfig METRIC_CONFIG = new MetricConfig();
private StoreIngestionTask ingestionTask;
Expand Down Expand Up @@ -117,6 +122,11 @@ public class IngestionStats {

private Count transformerErrorCount = new Count();
private Sensor transformerErrorSensor;
private final LongAdderRateGauge batchProcessingRequestSensor = new LongAdderRateGauge();
private final WritePathLatencySensor batchProcessingRequestSizeSensor;
private final LongAdderRateGauge batchProcessingRequestRecordsSensor = new LongAdderRateGauge();
private final WritePathLatencySensor batchProcessingRequestLatencySensor;
private final LongAdderRateGauge batchProcessingRequestErrorSensor = new LongAdderRateGauge();

public IngestionStats(VeniceServerConfig serverConfig) {

Expand Down Expand Up @@ -207,6 +217,14 @@ public IngestionStats(VeniceServerConfig serverConfig) {
registerSensor(localMetricRepository, OFFSET_REGRESSION_DCR_ERROR, offsetRegressionDCRErrorSensor);
registerSensor(localMetricRepository, TOMBSTONE_CREATION_DCR, tombstoneCreationDCRSensor);
registerSensor(localMetricRepository, IDLE_TIME, idleTimeSensor);

registerSensor(localMetricRepository, BATCH_PROCESSING_REQUEST, batchProcessingRequestSensor);
registerSensor(localMetricRepository, BATCH_PROCESSING_REQUEST_RECORDS, batchProcessingRequestRecordsSensor);
registerSensor(localMetricRepository, BATCH_PROCESSING_REQUEST_ERROR, batchProcessingRequestErrorSensor);
batchProcessingRequestSizeSensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, BATCH_PROCESSING_REQUEST_SIZE);
batchProcessingRequestLatencySensor =
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, BATCH_PROCESSING_REQUEST_LATENCY);
}

private void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) {
Expand Down Expand Up @@ -665,6 +683,40 @@ public WritePathLatencySensor getLeaderProducerCompletionLatencySensor() {
return leaderProducerCompletionLatencySensor;
}

public void recordBatchProcessingRequest(int size, long currentTimeMs) {
batchProcessingRequestSensor.record();
batchProcessingRequestRecordsSensor.record(size);
batchProcessingRequestSizeSensor.record(size, currentTimeMs);
}

public double getBatchProcessingRequest() {
return batchProcessingRequestSensor.getRate();
}

public double getBatchProcessingRequestRecords() {
return batchProcessingRequestRecordsSensor.getRate();
}

public void recordBatchProcessingRequestError() {
batchProcessingRequestErrorSensor.record();
}

public double getBatchProcessingRequestError() {
return batchProcessingRequestErrorSensor.getRate();
}

public WritePathLatencySensor getBatchProcessingRequestSizeSensor() {
return batchProcessingRequestSizeSensor;
}

public void recordBatchProcessingRequestLatency(double latency, long currentTimeMs) {
batchProcessingRequestLatencySensor.record(latency, currentTimeMs);
}

public WritePathLatencySensor getBatchProcessingRequestLatencySensor() {
return batchProcessingRequestLatencySensor;
}

public static double unAvailableToZero(double value) {
/* When data is unavailable, return 0 instead of NaN or Infinity. Some metrics are initialized to -INF.
This can cause problems when metrics are aggregated. Use only when zero makes semantic sense.
Expand Down
Loading

0 comments on commit 3c1b171

Please sign in to comment.