Skip to content

Commit

Permalink
[server][dvc] Server request/response handling overhaul (#1152)
Browse files Browse the repository at this point in the history
This refactors many parts of server request and response handling, including
how metrics are recorded.

The significant functional changes are:

1. Parallel batch gets no longer have a race condition in the way metrics
   are recorded. Previously, the metrics which aggregate a value for all
   keys in the query could be missing data points and thus under-report due
   to this race.

2. The configs to enable and tune parallel batch gets now apply to compute
   as well, which was not supported before.

3. K/V size profiling is now always enabled for single gets, since it is
   considered cheap enough for that workload. The config still controls
   whether it is enabled or not for batch get and compute.

Non-functional changes include:

1. The elimination of locking in parallel query processing.

2. The serialization of responses to bytes now happens inside each subtask,
   rather than sequentially after all subtasks are completed. These are then
   presented as a CompositeByteBuf, which may result in smaller allocations
   since there is no need for one large byte[] holding the whole response
   anymore.

3. The objects used to record stats are now specialized for each query type
   (single get, batch get, compute) as well as for whether K/V size
   profiling is enabled or not, leading to a reduction of memory footprint
   for workloads that don’t need certain stats. See the class hierarchy
   details below for more info.

4. As much as possible, when some behavior is fixed for the whole duration
   of the server’s runtime, a property is allocated to hold the closure to
   be executed according to the relevant config, rather than evaluating the
   immutable config in the hot path. Similary, removed as much branching as
   possible from ChunkingUtils and related classes.

Below are some of the implementation details for achieving the above changes
as well as to clean up some tech debt.

The ReadResponse is now split into two interfaces, each of which has its own
hierarchy of implementations:

- ReadResponse, which holds the state required to achieve the functional
  goals of the server, as seen by the querier (i.e. populating the payload
  and headers). This is now moved from the DVC module to the server module.
  The following subclass hierarchy exists:

  - AbstractReadResponse: holds common functional state and behavior for all
    read requests.

    - SingleGetResponseWrapper: renamed from StorageResponseObject.

    - MultiKeyResponseWrapper: used for single-threaded batch get requests
      as well as subtasks of parallel batch get requests.

      - ComputeResponseWrapper: used for single-threaded compute requests as
        well as subtasks of parallel compute requests.

    - ParallelMultiKeyResponseWrapper: used in parallel batch get & compute
      requests to hold an array of MultiKeyResponseWrapper, each of which
      are used exclusively by one subtask. This class is then responsible
      for aggregating the results of all these wrappers.

- ReadResponseStats: holds the state required for recording metrics, after
  the response is flushed. This replaces the ReadResponse parameter passed
  into all chunking-related utilities, thus more clearly constraining the
  role of that parameter in those classes. The purpose of this interface is
  to accumulate stats, but it doesn’t offer any API for reading or using
  those stats (see below how that part is done). The following subclass
  hierarchy exists:

  - AbstractReadResponseStats: holds the common metrics state reported for
    all read responses. The class also implements ReadResponseStatsRecorder,
    a new interface which is responsible for recording the accumulated
    metric state. Each subclass is responsible for defining the recording
    logic pertaining to the extra state it carries.

    - MultiKeyResponseStats: holds the record count.

      - MultiGetResponseStatsWithSizeProfiling: holds K/V sizes for each
        record.

    - ComputeResponseStats: holds nine primitives used exclusively in read
      compute stats.

      - ComputeResponseStatsWithSizeProfiling: holds K/V sizes for each
        record.

  - NoOpReadResponseStats: a singleton intended to be passed into the
    various ChunkingUtils APIs by code paths which do not need any stats
    recorded via this interface. This allows the ChunkingUtils code to
    assume this parameter is not null and systematically call whatever
    APIs it needs, rather than null-checking on the hot path.

Miscellaneous main code changes:

- MultiKeyRouterRequestWrapper::getKeys now returns a List<K> rather than
  an Iterable<K>.

- DaVinciBackend::getStoreOrThrow now makes use of CHM::computeIfAbsent,
  rather than syncrhonized. This helped stabilize an integration test.

- Muted a common stacktrace in IsolatedIngestionUtils.

- ThinClientMetaStoreBasedRepository::getStoreMetaValue now has a lower
  timeout and retries, rather than trying only one request with the
  default timeout of 10 seconds. Total time spent should be the same.

- Refactored ReplicationMetadataRocksDBStoragePartition so that the
  getReplicationMetadata API takes a ByteBuffer param, rather than byte[],
  to bring it in line with the RocksDBStoragePartition::get API.

- Discovered and fixed an unrelated bug in the ChunkingUtils'
  getValueAndSchemaIdFromStorage API, where it would carry the
  ChunkValueManifest's special schema ID rather than the real value's
  schema ID, when chunking kicks in.
  • Loading branch information
FelixGV committed Sep 5, 2024
1 parent 38da506 commit a0d5909
Show file tree
Hide file tree
Showing 61 changed files with 1,986 additions and 1,177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,8 @@ public synchronized void close() {
}
}

public synchronized StoreBackend getStoreOrThrow(String storeName) {
StoreBackend storeBackend = storeByNameMap.get(storeName);
if (storeBackend == null) {
storeBackend = new StoreBackend(this, storeName);
storeByNameMap.put(storeName, storeBackend);
}
return storeBackend;
public StoreBackend getStoreOrThrow(String storeName) {
return storeByNameMap.computeIfAbsent(storeName, s -> new StoreBackend(this, s));
}

ScheduledExecutorService getExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS;

import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.listener.response.NoOpReadResponseStats;
import com.linkedin.davinci.notifier.DaVinciPushStatusUpdateTask;
import com.linkedin.davinci.storage.chunking.AbstractAvroChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
Expand Down Expand Up @@ -215,7 +216,7 @@ public <V> V read(
reusableValue,
binaryDecoder,
version.isChunkingEnabled(),
null,
NoOpReadResponseStats.SINGLETON,
readerSchemaId,
storeDeserializerCache,
compressor.get());
Expand All @@ -242,7 +243,7 @@ public GenericRecord compute(
reusableValueRecord,
binaryDecoder,
version.isChunkingEnabled(),
null,
NoOpReadResponseStats.SINGLETON,
readerSchemaId,
storeDeserializerCache,
compressor.get());
Expand Down Expand Up @@ -296,7 +297,6 @@ public void onCompletion(Optional<Exception> exception) {
reusableBinaryDecoder,
keyRecordDeserializer,
this.version.isChunkingEnabled(),
null,
getSupersetOrLatestValueSchemaId(),
this.storeDeserializerCache,
this.compressor.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,13 @@ public static String executeShellCommand(String command) {
} catch (Exception e) {
if (command.contains("kill")) {
throw new VeniceException("Encountered exception when executing shell command: " + command, e);
} else if (e.getMessage().contains("No such file or directory")) {
// This is a common case, so we mute the full exception stacktrace.
LOGGER.info("Command not found. Exception message: {}", e.getMessage());
} else {
LOGGER.info("Encounter exception when executing shell command: {}", command, e);
return "";
}
return "";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ byte[] getRmdWithValueSchemaByteBufferFromStorage(
long currentTimeForMetricsMs) {
final long lookupStartTimeInNS = System.nanoTime();
ValueRecord result = SingleGetChunkingAdapter
.getReplicationMetadata(getStorageEngine(), partition, key, isChunked(), null, rmdManifestContainer);
.getReplicationMetadata(getStorageEngine(), partition, key, isChunked(), rmdManifestContainer);
getHostLevelIngestionStats().recordIngestionReplicationMetadataLookUpLatency(
LatencyUtils.getElapsedTimeFromNSToMS(lookupStartTimeInNS),
currentTimeForMetricsMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.ingestion.LagType;
import com.linkedin.davinci.listener.response.NoOpReadResponseStats;
import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper;
import com.linkedin.davinci.schema.merge.MergeRecordHelper;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
Expand Down Expand Up @@ -3207,7 +3208,7 @@ private GenericRecord readStoredValueRecord(
isChunked,
null,
null,
null,
NoOpReadResponseStats.SINGLETON,
readerValueSchemaID,
storeDeserializerCache,
compressor.get(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.linkedin.davinci.listener.response;

public class NoOpReadResponseStats implements ReadResponseStats {
public static final NoOpReadResponseStats SINGLETON = new NoOpReadResponseStats();

private NoOpReadResponseStats() {
}

@Override
public long getCurrentTimeInNanos() {
return 0;
}

@Override
public void addDatabaseLookupLatency(long startTimeInNanos) {

}

@Override
public void addReadComputeLatency(double latency) {

}

@Override
public void addReadComputeDeserializationLatency(double latency) {

}

@Override
public void addReadComputeSerializationLatency(double latency) {

}

@Override
public void addKeySize(int size) {

}

@Override
public void addValueSize(int size) {

}

@Override
public void addReadComputeOutputSize(int size) {

}

@Override
public void incrementDotProductCount(int count) {

}

@Override
public void incrementCountOperatorCount(int count) {

}

@Override
public void incrementCosineSimilarityCount(int count) {

}

@Override
public void incrementHadamardProductCount(int count) {

}

@Override
public void setStorageExecutionSubmissionWaitTime(double storageExecutionSubmissionWaitTime) {

}

@Override
public void setStorageExecutionQueueLen(int storageExecutionQueueLen) {

}

@Override
public void incrementMultiChunkLargeValueCount() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,207 +2,40 @@

import com.linkedin.venice.compression.CompressionStrategy;
import io.netty.buffer.ByteBuf;
import it.unimi.dsi.fastutil.ints.IntList;


/**
* This class is used to store common fields shared by various read responses.
* This is used to store common fields shared by various read responses. It is intended to store state required to
* fulfill the service's goals from the perspective of external callers (e.g. the Router or Fast Client).
*
* See also {@link #getStats()} which returns the container used to store state exclusively used by metrics.
*/
public abstract class ReadResponse {
private double databaseLookupLatency = -1;
private double readComputeLatency = -1;
private double readComputeDeserializationLatency = -1;
private double readComputeSerializationLatency = -1;
private double storageExecutionSubmissionWaitTime;
private int storageExecutionQueueLen = -1;
private int multiChunkLargeValueCount = 0;
private CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP;
private boolean isStreamingResponse = false;
private IntList keySizeList;
private IntList valueSizeList;
private int valueSize = 0;
private int readComputeOutputSize = 0;
private int dotProductCount = 0;
private int cosineSimilarityCount = 0;
private int hadamardProductCount = 0;
private int countOperatorCount = 0;
private int rcu = 0;
public interface ReadResponse {
ReadResponseStats getStats();

public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
this.compressionStrategy = compressionStrategy;
}
void setCompressionStrategy(CompressionStrategy compressionStrategy);

public void setStreamingResponse() {
this.isStreamingResponse = true;
}
void setStreamingResponse();

public boolean isStreamingResponse() {
return this.isStreamingResponse;
}
boolean isStreamingResponse();

public CompressionStrategy getCompressionStrategy() {
return compressionStrategy;
}

public void setDatabaseLookupLatency(double latency) {
this.databaseLookupLatency = latency;
}

public void addDatabaseLookupLatency(double latency) {
this.databaseLookupLatency += latency;
}

public double getDatabaseLookupLatency() {
return this.databaseLookupLatency;
}

public void setReadComputeLatency(double latency) {
this.readComputeLatency = latency;
}

public void addReadComputeLatency(double latency) {
this.readComputeLatency += latency;
}

public double getReadComputeLatency() {
return this.readComputeLatency;
}

public void setReadComputeDeserializationLatency(double latency) {
this.readComputeDeserializationLatency = latency;
}

public void addReadComputeDeserializationLatency(double latency) {
this.readComputeDeserializationLatency += latency;
}

public void setKeySizeList(IntList keySizeList) {
this.keySizeList = keySizeList;
}

public void setValueSizeList(IntList valueSizeList) {
this.valueSizeList = valueSizeList;
}

public double getReadComputeDeserializationLatency() {
return this.readComputeDeserializationLatency;
}

public void setReadComputeSerializationLatency(double latency) {
this.readComputeSerializationLatency = latency;
}

public void addReadComputeSerializationLatency(double latency) {
this.readComputeSerializationLatency += latency;
}

public void addValueSize(int size) {
this.valueSize += size;
}

public int getValueSize() {
return valueSize;
}

public void addReadComputeOutputSize(int size) {
this.readComputeOutputSize += size;
}

public int getReadComputeOutputSize() {
return readComputeOutputSize;
}

public void incrementDotProductCount(int count) {
dotProductCount += count;
}

public void incrementCountOperatorCount(int count) {
countOperatorCount += count;
}

public void incrementCosineSimilarityCount(int count) {
cosineSimilarityCount += count;
}

public void incrementHadamardProductCount(int count) {
hadamardProductCount += count;
}

public double getReadComputeSerializationLatency() {
return this.readComputeSerializationLatency;
}

public double getStorageExecutionHandlerSubmissionWaitTime() {
return storageExecutionSubmissionWaitTime;
}

public void setStorageExecutionSubmissionWaitTime(double storageExecutionSubmissionWaitTime) {
this.storageExecutionSubmissionWaitTime = storageExecutionSubmissionWaitTime;
}
CompressionStrategy getCompressionStrategy();

/**
* Set the read compute unit (RCU) cost for this response's request
* @param rcu
*/
public void setRCU(int rcu) {
this.rcu = rcu;
}
void setRCU(int rcu);

/**
* Get the read compute unit (RCU) for this response's request
* @return
*/
public int getRCU() {
return this.rcu;
}

public int getStorageExecutionQueueLen() {
return storageExecutionQueueLen;
}

public void setStorageExecutionQueueLen(int storageExecutionQueueLen) {
this.storageExecutionQueueLen = storageExecutionQueueLen;
}

public void incrementMultiChunkLargeValueCount() {
multiChunkLargeValueCount++;
}

public int getMultiChunkLargeValueCount() {
return multiChunkLargeValueCount;
}

public boolean isFound() {
return true;
}

public IntList getKeySizeList() {
return keySizeList;
}

public IntList getValueSizeList() {
return valueSizeList;
}

public int getDotProductCount() {
return dotProductCount;
}

public int getCosineSimilarityCount() {
return cosineSimilarityCount;
}

public int getHadamardProductCount() {
return hadamardProductCount;
}

public int getCountOperatorCount() {
return countOperatorCount;
}
int getRCU();

public abstract int getRecordCount();
boolean isFound();

public abstract ByteBuf getResponseBody();
ByteBuf getResponseBody();

public abstract int getResponseSchemaIdHeader();
int getResponseSchemaIdHeader();
}
Loading

0 comments on commit a0d5909

Please sign in to comment.