Skip to content

Commit

Permalink
[server] [controller] [admin-tool] Max Nearline Record Size Config (#…
Browse files Browse the repository at this point in the history
…1131)

Added store-level config `maxNearlineRecordSizeBytes` to allow changing the nearline limit on the Venice Server without affecting the batch limit. 🪭
  • Loading branch information
KaiSernLim authored Sep 3, 2024
1 parent c8b95ab commit fa27ef0
Show file tree
Hide file tree
Showing 24 changed files with 1,643 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {

private final AtomicLong lastSendIngestionHeartbeatTimestamp = new AtomicLong(0);

protected final int maxRecordSizeBytes; // TODO: move into VeniceWriter when nearline jobs enforce max record size

public LeaderFollowerStoreIngestionTask(
StoreIngestionTaskFactory.Builder builder,
Store store,
Expand Down Expand Up @@ -286,9 +284,6 @@ public LeaderFollowerStoreIngestionTask(
.setPartitionCount(storeVersionPartitionCount)
.build();
this.veniceWriter = Lazy.of(() -> veniceWriterFactory.createVeniceWriter(writerOptions));
this.maxRecordSizeBytes = (store.getMaxRecordSizeBytes() < 0) // TODO: move to VeniceWriter when nearline supported
? serverConfig.getDefaultMaxRecordSizeBytes()
: store.getMaxRecordSizeBytes();
this.kafkaClusterIdToUrlMap = serverConfig.getKafkaClusterIdToUrlMap();
this.kafkaDataIntegrityValidatorForLeaders = new KafkaDataIntegrityValidator(kafkaVersionTopic);
if (builder.getVeniceViewWriterFactory() != null && !store.getViewConfigs().isEmpty()) {
Expand Down Expand Up @@ -1900,14 +1895,30 @@ protected void recordProcessedRecordStats(
}
}

/**
* maxRecordSizeBytes (and the nearline variant) is a store-level config that defaults to -1.
* The default value will be set fleet-wide using the default.max.record.size.bytes on config the server and controller.
*/
private int backfillRecordSizeLimit(int recordSizeLimit) {
return (recordSizeLimit > 0) ? recordSizeLimit : serverConfig.getDefaultMaxRecordSizeBytes();
}

protected int getMaxRecordSizeBytes() {
return backfillRecordSizeLimit(storeRepository.getStore(storeName).getMaxRecordSizeBytes());
}

protected int getMaxNearlineRecordSizeBytes() {
return backfillRecordSizeLimit(storeRepository.getStore(storeName).getMaxNearlineRecordSizeBytes());
}

@Override
protected final double calculateAssembledRecordSizeRatio(long recordSize) {
return (double) recordSize / maxRecordSizeBytes;
return (double) recordSize / getMaxRecordSizeBytes();
}

@Override
protected final void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs) {
if (maxRecordSizeBytes != VeniceWriter.UNLIMITED_MAX_RECORD_SIZE) {
if (getMaxRecordSizeBytes() != VeniceWriter.UNLIMITED_MAX_RECORD_SIZE && ratio > 0) {
hostLevelIngestionStats.recordAssembledRecordSizeRatio(ratio, currentTimeMs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ private MockStoreVersionConfigs setupStoreAndVersionMocks(
doReturn(1).when(mockStore).getPartitionCount();

doReturn(VeniceWriter.UNLIMITED_MAX_RECORD_SIZE).when(mockStore).getMaxRecordSizeBytes();
doReturn(VeniceWriter.UNLIMITED_MAX_RECORD_SIZE).when(mockStore).getMaxNearlineRecordSizeBytes();

doReturn(false).when(mockStore).isHybridStoreDiskQuotaEnabled();
doReturn(-1).when(mockStore).getCurrentVersion();
Expand Down
7 changes: 5 additions & 2 deletions clients/venice-admin-tool/README
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,11 @@ usage: Parameters:
-mcls,--min-compaction-lag-seconds <arg> Min compaction lag seconds for version topic of hybrid stores
-mpa,--max_poll_attempts <arg> The max amount of attempts to poll new data from a Kafka topic (should no
new data be available).
-mrsb,--max-record-size-bytes <arg> Store-level max record size in bytes. Used by VeniceWriter to fail batch
push jobs and pause consumption on nearline jobs.
-mrsb,--max-record-size-bytes <arg> Store-level max record size for VeniceWriter to determine whether to fail
batch push jobs. This setting can potentially converge with the
nearline setting in the future.
-mnrsb,--max-nearline-record-size-bytes <arg> Store-level max record size for VeniceWriter to determine whether to
pause consumption on nearline jobs with partial updates.
-mxcls,--max-compaction-lag-seconds <arg> Max compaction lag seconds for version topic of hybrid stores
-n,--storage-node <arg> Helix instance ID for a storage node, eg. lva1-app1234_1690
-nita,--non-interactive non-interactive mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
longParam(cmd, Arg.MIN_COMPACTION_LAG_SECONDS, p -> params.setMinCompactionLagSeconds(p), argSet);
longParam(cmd, Arg.MAX_COMPACTION_LAG_SECONDS, p -> params.setMaxCompactionLagSeconds(p), argSet);
integerParam(cmd, Arg.MAX_RECORD_SIZE_BYTES, params::setMaxRecordSizeBytes, argSet);
integerParam(cmd, Arg.MAX_NEARLINE_RECORD_SIZE_BYTES, params::setMaxNearlineRecordSizeBytes, argSet);
booleanParam(cmd, Arg.UNUSED_SCHEMA_DELETION_ENABLED, p -> params.setUnusedSchemaDeletionEnabled(p), argSet);
booleanParam(cmd, Arg.BLOB_TRANSFER_ENABLED, p -> params.setBlobTransferEnabled(p), argSet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ public enum Arg {
),
MAX_RECORD_SIZE_BYTES(
"max-record-size-bytes", "mrsb", true,
"Store-level max record size in bytes. Used by VeniceWriter to fail batch push jobs and pause consumption on nearline jobs with partial update."
"Store-level max record size for VeniceWriter to determine whether to fail batch push jobs. This setting can potentially converge with the nearline setting in the future."
),
MAX_NEARLINE_RECORD_SIZE_BYTES(
"max-nearline-record-size-bytes", "mnrsb", true,
"Store-level max record size for VeniceWriter to determine whether to pause consumption on nearline jobs with partial updates."
), UNUSED_SCHEMA_DELETION_ENABLED("enable-unused-schema-deletion", "usde", true, "Enable unused schema deletion"),
PARTITION("partition", "p", true, "Partition Id"),
INTERVAL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static com.linkedin.venice.Arg.LARGEST_USED_VERSION_NUMBER;
import static com.linkedin.venice.Arg.LATEST_SUPERSET_SCHEMA_ID;
import static com.linkedin.venice.Arg.MAX_COMPACTION_LAG_SECONDS;
import static com.linkedin.venice.Arg.MAX_NEARLINE_RECORD_SIZE_BYTES;
import static com.linkedin.venice.Arg.MAX_RECORD_SIZE_BYTES;
import static com.linkedin.venice.Arg.MESSAGE_COUNT;
import static com.linkedin.venice.Arg.MIGRATION_PUSH_STRATEGY;
Expand Down Expand Up @@ -265,7 +266,8 @@ public enum Command {
BACKUP_VERSION_RETENTION_DAY, REPLICATION_FACTOR, NATIVE_REPLICATION_SOURCE_FABRIC, REPLICATE_ALL_CONFIGS,
ACTIVE_ACTIVE_REPLICATION_ENABLED, REGIONS_FILTER, DISABLE_META_STORE, DISABLE_DAVINCI_PUSH_STATUS_STORE,
STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS,
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED }
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public static void recover(
.setMinCompactionLagSeconds(deletedStore.getMinCompactionLagSeconds())
.setMaxCompactionLagSeconds(deletedStore.getMaxCompactionLagSeconds())
.setMaxRecordSizeBytes(deletedStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(deletedStore.getMaxNearlineRecordSizeBytes())
.setBlobTransferEnabled(deletedStore.isBlobTransferEnabled());
System.out.println(
"Updating store: " + storeName + " in cluster: " + recoverCluster + " with params: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public class ControllerApiConstants {
public static final String MAX_COMPACTION_LAG_SECONDS = "max_compaction_lag_seconds";

public static final String MAX_RECORD_SIZE_BYTES = "max_record_size_bytes";
public static final String MAX_NEARLINE_RECORD_SIZE_BYTES = "max_nearline_record_size_bytes";

public static final String UNUSED_SCHEMA_DELETION_ENABLED = "unused_schema_deletion_enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KEY_SCHEMA;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LOCKED_STORAGE_NODE_IDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_NEARLINE_RECORD_SIZE_BYTES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_RECORD_SIZE_BYTES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NUM_VERSIONS_TO_PRESERVE;
Expand Down Expand Up @@ -128,7 +129,7 @@ public enum ControllerRoute {
READ_COMPUTATION_ENABLED, BACKUP_STRATEGY, AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED, INCREMENTAL_PUSH_ENABLED,
BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOURS, HYBRID_STORE_DISK_QUOTA_ENABLED, REGULAR_VERSION_ETL_ENABLED,
FUTURE_VERSION_ETL_ENABLED, ETLED_PROXY_USER_ACCOUNT, DISABLE_META_STORE, DISABLE_DAVINCI_PUSH_STATUS_STORE,
PERSONA_NAME, MAX_RECORD_SIZE_BYTES
PERSONA_NAME, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES
), SET_VERSION("/set_version", HttpMethod.POST, Arrays.asList(NAME, VERSION)),
ROLLBACK_TO_BACKUP_VERSION(
"/rollback_to_backup_version", HttpMethod.POST, Collections.singletonList(NAME), REGIONS_FILTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LATEST_SUPERSET_SCHEMA_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_COMPACTION_LAG_SECONDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_NEARLINE_RECORD_SIZE_BYTES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_RECORD_SIZE_BYTES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MIGRATION_DUPLICATE_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MIN_COMPACTION_LAG_SECONDS;
Expand Down Expand Up @@ -135,6 +136,7 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) {
.setStorageNodeReadQuotaEnabled(srcStore.isStorageNodeReadQuotaEnabled())
.setBlobTransferEnabled(srcStore.isBlobTransferEnabled())
.setMaxRecordSizeBytes(srcStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(srcStore.getMaxNearlineRecordSizeBytes())
// TODO: This needs probably some refinement, but since we only support one kind of view type today, this is
// still easy to parse
.setStoreViews(
Expand Down Expand Up @@ -676,6 +678,14 @@ public Optional<Integer> getMaxRecordSizeBytes() {
return getInteger(MAX_RECORD_SIZE_BYTES);
}

public UpdateStoreQueryParams setMaxNearlineRecordSizeBytes(int maxNearlineRecordSizeBytes) {
return putInteger(MAX_NEARLINE_RECORD_SIZE_BYTES, maxNearlineRecordSizeBytes);
}

public Optional<Integer> getMaxNearlineRecordSizeBytes() {
return getInteger(MAX_NEARLINE_RECORD_SIZE_BYTES);
}

public UpdateStoreQueryParams setUnusedSchemaDeletionEnabled(boolean unusedSchemaDeletionEnabled) {
return putBoolean(UNUSED_SCHEMA_DELETION_ENABLED, unusedSchemaDeletionEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,16 @@ public void setMaxRecordSizeBytes(int maxRecordSizeBytes) {
throw new UnsupportedOperationException();
}

@Override
public int getMaxNearlineRecordSizeBytes() {
return this.delegate.getMaxNearlineRecordSizeBytes();
}

@Override
public void setMaxNearlineRecordSizeBytes(int maxNearlineRecordSizeBytes) {
throw new UnsupportedOperationException();
}

@Override
public void setUnusedSchemaDeletionEnabled(boolean unusedSchemaDeletionEnabled) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ static boolean isSystemStore(String storeName) {

void setMaxRecordSizeBytes(int maxRecordSizeBytes);

int getMaxNearlineRecordSizeBytes();

void setMaxNearlineRecordSizeBytes(int maxNearlineRecordSizeBytes);

void setUnusedSchemaDeletionEnabled(boolean unusedSchemaDeletionEnabled);

boolean isUnusedSchemaDeletionEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public static StoreInfo fromStore(Store store) {
storeInfo.setMinCompactionLagSeconds(store.getMinCompactionLagSeconds());
storeInfo.setMaxCompactionLagSeconds(store.getMaxCompactionLagSeconds());
storeInfo.setMaxRecordSizeBytes(store.getMaxRecordSizeBytes());
storeInfo.setMaxNearlineRecordSizeBytes(store.getMaxNearlineRecordSizeBytes());
storeInfo.setUnusedSchemaDeletionEnabled(store.isUnusedSchemaDeletionEnabled());
storeInfo.setBlobTransferEnabled(store.isBlobTransferEnabled());
return storeInfo;
Expand Down Expand Up @@ -315,6 +316,8 @@ public static StoreInfo fromStore(Store store) {

private int maxRecordSizeBytes = VeniceWriter.UNLIMITED_MAX_RECORD_SIZE;

private int maxNearlineRecordSizeBytes = VeniceWriter.UNLIMITED_MAX_RECORD_SIZE;

private boolean unusedSchemaDeletionEnabled;

private boolean blobTransferEnabled;
Expand Down Expand Up @@ -784,6 +787,14 @@ public void setMaxRecordSizeBytes(int maxRecordSizeBytes) {
this.maxRecordSizeBytes = maxRecordSizeBytes;
}

public int getMaxNearlineRecordSizeBytes() {
return this.maxNearlineRecordSizeBytes;
}

public void setMaxNearlineRecordSizeBytes(int maxNearlineRecordSizeBytes) {
this.maxNearlineRecordSizeBytes = maxNearlineRecordSizeBytes;
}

public void setUnusedSchemaDeletionEnabled(boolean unusedSchemaDeletionEnabled) {
this.unusedSchemaDeletionEnabled = unusedSchemaDeletionEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,16 @@ public void setMaxRecordSizeBytes(int maxRecordSizeBytes) {
throwUnsupportedOperationException("setMaxRecordSizeBytes");
}

@Override
public int getMaxNearlineRecordSizeBytes() {
return zkSharedStore.getMaxNearlineRecordSizeBytes();
}

@Override
public void setMaxNearlineRecordSizeBytes(int maxNearlineRecordSizeBytes) {
throwUnsupportedOperationException("setMaxNearlineRecordSizeBytes");
}

@Override
public Store cloneStore() {
return new SystemStore(zkSharedStore.cloneStore(), systemStoreType, veniceStore.cloneStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public ZKStore(Store store) {
setMinCompactionLagSeconds(store.getMinCompactionLagSeconds());
setMaxCompactionLagSeconds(store.getMaxCompactionLagSeconds());
setMaxRecordSizeBytes(store.getMaxRecordSizeBytes());
setMaxNearlineRecordSizeBytes(store.getMaxNearlineRecordSizeBytes());
setBlobTransferEnabled(store.isBlobTransferEnabled());

for (Version storeVersion: store.getVersions()) {
Expand Down Expand Up @@ -861,6 +862,16 @@ public void setMaxRecordSizeBytes(int maxRecordSizeBytes) {
this.storeProperties.maxRecordSizeBytes = maxRecordSizeBytes;
}

@Override
public int getMaxNearlineRecordSizeBytes() {
return this.storeProperties.maxNearlineRecordSizeBytes;
}

@Override
public void setMaxNearlineRecordSizeBytes(int maxNearlineRecordSizeBytes) {
this.storeProperties.maxNearlineRecordSizeBytes = maxNearlineRecordSizeBytes;
}

@Override
public void setUnusedSchemaDeletionEnabled(boolean unusedSchemaDeletionEnabled) {
this.storeProperties.unusedSchemaDeletionEnabled = unusedSchemaDeletionEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(79, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(80, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(22, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(23, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Loading

0 comments on commit fa27ef0

Please sign in to comment.