Skip to content

Commit

Permalink
refactor: expose configDef() for all cache config sets
Browse files Browse the repository at this point in the history
This is needed for documentation purposes, while also improving the
cache config builder.
  • Loading branch information
jeqo committed Oct 1, 2024
1 parent 87ea1a5 commit 881af51
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,59 +38,21 @@ public class CacheConfig extends AbstractConfig {
private static final String CACHE_FETCH_TIMEOUT_MS_DOC = "When getting an object from the fetch, "
+ "how long to wait before timing out. Defaults to 10 sec.";

static final long CACHE_RETENTION_MS_DEFAULT = 600_000;
public static final long CACHE_RETENTION_MS_DEFAULT = 600_000;

private static ConfigDef configDef(
public CacheConfig(
final ConfigDef configDef,
final Object defaultSize,
final long defaultRetentionMs
final Map<String, ?> props
) {
configDef.define(
CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
defaultSize,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_SIZE_DOC
);
configDef.define(
CACHE_RETENTION_CONFIG,
ConfigDef.Type.LONG,
defaultRetentionMs,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_RETENTION_DOC
);
configDef.define(
CACHE_FETCH_THREAD_POOL_SIZE_CONFIG,
ConfigDef.Type.INT,
0,
ConfigDef.Range.between(0, 1024),
ConfigDef.Importance.LOW,
CACHE_FETCH_THREAD_POOL_SIZE_DOC
);
configDef.define(
CACHE_FETCH_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
Duration.ofSeconds(10).toMillis(),
ConfigDef.Range.between(1, Long.MAX_VALUE),
ConfigDef.Importance.LOW,
CACHE_FETCH_TIMEOUT_MS_DOC
);
return configDef;
super(configDef, props);
}

CacheConfig(
final ConfigDef configDef,
final Map<String, ?> props,
final Object defaultSize,
final long defaultRetentionMs
) {
super(configDef(configDef, defaultSize, defaultRetentionMs), props);
public static DefBuilder defBuilder() {
return new DefBuilder();
}

public static Builder newBuilder(final Map<String, ?> configs) {
return new Builder(configs);
public static DefBuilder defBuilder(final ConfigDef baseConfig) {
return new DefBuilder(baseConfig);
}

public Optional<Long> cacheSize() {
Expand Down Expand Up @@ -121,27 +83,64 @@ public Duration getTimeout() {
return Duration.ofMillis(getLong(CACHE_FETCH_TIMEOUT_MS_CONFIG));
}

public static class Builder {
private final Map<String, ?> props;
public static class DefBuilder {
private long defaultRetentionMs = CACHE_RETENTION_MS_DEFAULT;
private Object maybeDefaultSize = NO_DEFAULT_VALUE;

public Builder(final Map<String, ?> props) {
this.props = props;
final ConfigDef configDef;

public DefBuilder() {
this.configDef = new ConfigDef();
}

public DefBuilder(final ConfigDef baseConfig) {
this.configDef = baseConfig;
}

public Builder withDefaultSize(final long defaultSize) {
public DefBuilder withDefaultSize(final long defaultSize) {
this.maybeDefaultSize = defaultSize;
return this;
}

public Builder withDefaultRetentionMs(final long retentionMs) {
public DefBuilder withDefaultRetentionMs(final long retentionMs) {
this.defaultRetentionMs = retentionMs;
return this;
}

public CacheConfig build() {
return new CacheConfig(new ConfigDef(), props, maybeDefaultSize, defaultRetentionMs);
public ConfigDef build() {
configDef.define(
CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
maybeDefaultSize,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_SIZE_DOC
);
configDef.define(
CACHE_RETENTION_CONFIG,
ConfigDef.Type.LONG,
defaultRetentionMs,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_RETENTION_DOC
);
configDef.define(
CACHE_FETCH_THREAD_POOL_SIZE_CONFIG,
ConfigDef.Type.INT,
0,
ConfigDef.Range.between(0, 1024),
ConfigDef.Importance.LOW,
CACHE_FETCH_THREAD_POOL_SIZE_DOC
);
configDef.define(
CACHE_FETCH_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
Duration.ofSeconds(10).toMillis(),
ConfigDef.Range.between(1, Long.MAX_VALUE),
ConfigDef.Importance.LOW,
CACHE_FETCH_TIMEOUT_MS_DOC
);
return configDef;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,26 @@ public class ChunkCacheConfig extends CacheConfig {
"The amount of data that should be eagerly prefetched and cached";
private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be

private static ConfigDef addCacheConfigs(final ConfigDef configDef) {
configDef.define(
public static final ConfigDef configDef(final ConfigDef baseConfig) {
baseConfig.define(
CACHE_PREFETCH_MAX_SIZE_CONFIG,
ConfigDef.Type.INT,
CACHE_PREFETCHING_SIZE_DEFAULT,
ConfigDef.Range.between(0, Integer.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_PREFETCH_MAX_SIZE_DOC
);
return configDef;
return CacheConfig.defBuilder(baseConfig)
.withDefaultRetentionMs(ChunkCacheConfig.CACHE_RETENTION_MS_DEFAULT)
.build();
}

public ChunkCacheConfig(final ConfigDef configDef, final Map<String, ?> props) {
super(addCacheConfigs(configDef), props, ConfigDef.NO_DEFAULT_VALUE, CacheConfig.CACHE_RETENTION_MS_DEFAULT);
super(configDef, props);
}

public ChunkCacheConfig(final Map<String, ?> props) {
super(configDef(new ConfigDef()), props);
}

public int cachePrefetchingSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.fetch;
package io.aiven.kafka.tieredstorage.config;

import java.util.Map;

Expand All @@ -23,30 +23,30 @@

import io.aiven.kafka.tieredstorage.config.validators.Subclass;
import io.aiven.kafka.tieredstorage.fetch.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache;
import io.aiven.kafka.tieredstorage.fetch.cache.MemoryChunkCache;

public class ChunkManagerFactoryConfig extends AbstractConfig {

protected static final String FETCH_CHUNK_CACHE_PREFIX = "fetch.chunk.cache.";
public static final String FETCH_CHUNK_CACHE_PREFIX = "fetch.chunk.cache.";
public static final String FETCH_CHUNK_CACHE_CONFIG = FETCH_CHUNK_CACHE_PREFIX + "class";
private static final String FETCH_CHUNK_CACHE_DOC = "The fetch chunk cache implementation";

private static final ConfigDef CONFIG;

static {
CONFIG = new ConfigDef();

CONFIG.define(
FETCH_CHUNK_CACHE_CONFIG,
ConfigDef.Type.CLASS,
null,
Subclass.of(ChunkCache.class),
ConfigDef.Importance.MEDIUM,
FETCH_CHUNK_CACHE_DOC
);
private static final String FETCH_CHUNK_CACHE_DOC = "Chunk cache implementation. There are 2 implementations "
+ "included: " + MemoryChunkCache.class.getName() + " and " + DiskChunkCache.class.getName();

public static ConfigDef configDef() {
return new ConfigDef()
.define(
FETCH_CHUNK_CACHE_CONFIG,
ConfigDef.Type.CLASS,
null,
Subclass.of(ChunkCache.class),
ConfigDef.Importance.MEDIUM,
FETCH_CHUNK_CACHE_DOC
);
}

public ChunkManagerFactoryConfig(final Map<?, ?> originals) {
super(CONFIG, originals);
super(configDef(), originals);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ public class DiskChunkCacheConfig extends ChunkCacheConfig {
public static final String TEMP_CACHE_DIRECTORY = "temp";
public static final String CACHE_DIRECTORY = "cache";

private static ConfigDef configDef() {
final ConfigDef configDef = new ConfigDef();
configDef.define(
CACHE_PATH_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
CACHE_PATH_DOC
public static ConfigDef configDef() {
return configDef(
new ConfigDef()
.define(
CACHE_PATH_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
CACHE_PATH_DOC
)
);
return configDef;
}

public DiskChunkCacheConfig(final Map<String, ?> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.kafka.common.Configurable;

import io.aiven.kafka.tieredstorage.config.ChunkManagerFactoryConfig;
import io.aiven.kafka.tieredstorage.fetch.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.io.InputStream;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.tieredstorage.config.ChunkCacheConfig;
import io.aiven.kafka.tieredstorage.fetch.ChunkKey;
import io.aiven.kafka.tieredstorage.fetch.ChunkManager;
Expand Down Expand Up @@ -64,7 +62,7 @@ public Weigher<ChunkKey, byte[]> weigher() {

@Override
public void configure(final Map<String, ?> configs) {
final ChunkCacheConfig config = new ChunkCacheConfig(new ConfigDef(), configs);
final ChunkCacheConfig config = new ChunkCacheConfig(configs);
this.cache = buildCache(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;

import io.aiven.kafka.tieredstorage.config.CacheConfig;
Expand Down Expand Up @@ -133,9 +134,13 @@ public InputStream get(

@Override
public void configure(final Map<String, ?> configs) {
final var config = CacheConfig.newBuilder(configs)
final var config = new CacheConfig(configDef(), configs);
this.cache = buildCache(config);
}

public static ConfigDef configDef() {
return CacheConfig.defBuilder()
.withDefaultSize(DEFAULT_MAX_SIZE_BYTES)
.build();
this.cache = buildCache(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
Expand Down Expand Up @@ -122,10 +124,14 @@ protected AsyncLoadingCache<ObjectKey, SegmentManifest> buildCache(final CacheCo

@Override
public void configure(final Map<String, ?> configs) {
final var config = CacheConfig.newBuilder(configs)
final var config = new CacheConfig(configDef(), configs);
this.cache = buildCache(config);
}

public static ConfigDef configDef() {
return CacheConfig.defBuilder()
.withDefaultSize(DEFAULT_MAX_SIZE)
.withDefaultRetentionMs(DEFAULT_RETENTION_MS)
.build();
this.cache = buildCache(config);
}
}
Loading

0 comments on commit 881af51

Please sign in to comment.