Skip to content

Commit

Permalink
[Tiered Caching] Enable serialization of IndicesRequestCache.Key (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#10275)


---------

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
Co-authored-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
sgup432 and kiranprakash154 authored Jan 11, 2024
1 parent de78c7c commit e4583a4
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 145 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753]
(https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.core.index.shard;

import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -55,6 +56,8 @@ public class ShardId implements Comparable<ShardId>, ToXContentFragment, Writeab
private final int shardId;
private final int hashCode;

private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class);

/**
* Constructs a new shard id.
* @param index the index name
Expand Down Expand Up @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException {
hashCode = computeHashCode();
}

public long getBaseRamBytesUsed() {
return BASE_RAM_BYTES_USED;
}

/**
* Writes this shard id to a stream.
* @param out the stream to write to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,45 @@ public void testProfileDisableCache() throws Exception {
}
}

public void testCacheWithInvalidation() throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));

assertCacheState(client, "index", 0, 1);
// Index but don't refresh
indexRandom(false, client.prepareIndex("index").setSource("k", "hello2"));
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2);
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand All @@ -650,6 +689,7 @@ private static void assertCacheState(Client client, String index, long expectedH
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.opensearch.core.index.shard.ShardId;

import java.io.IOException;
import java.util.Optional;
import java.util.UUID;

/**
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
Expand All @@ -53,11 +55,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader {
private final ShardId shardId;
private final FilterDirectoryReader.SubReaderWrapper wrapper;

private final DelegatingCacheHelper delegatingCacheHelper;

private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId)
throws IOException {
super(in, wrapper);
this.wrapper = wrapper;
this.shardId = shardId;
this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper());
}

/**
Expand All @@ -70,7 +75,61 @@ public ShardId shardId() {
@Override
public CacheHelper getReaderCacheHelper() {
// safe to delegate since this reader does not alter the index
return in.getReaderCacheHelper();
return this.delegatingCacheHelper;
}

public DelegatingCacheHelper getDelegatingCacheHelper() {
return this.delegatingCacheHelper;
}

/**
* Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey.
* @opensearch.internal
*/
public class DelegatingCacheHelper implements CacheHelper {
private final CacheHelper cacheHelper;
private final DelegatingCacheKey serializableCacheKey;

DelegatingCacheHelper(CacheHelper cacheHelper) {
this.cacheHelper = cacheHelper;
this.serializableCacheKey = new DelegatingCacheKey(Optional.ofNullable(cacheHelper).map(key -> getKey()).orElse(null));
}

@Override
public CacheKey getKey() {
return this.cacheHelper.getKey();
}

public DelegatingCacheKey getDelegatingCacheKey() {
return this.serializableCacheKey;
}

@Override
public void addClosedListener(ClosedListener listener) {
this.cacheHelper.addClosedListener(listener);
}
}

/**
* Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of
* object itself for serialization purposes.
*/
public class DelegatingCacheKey {
private final CacheKey cacheKey;
private final String uniqueId;

DelegatingCacheKey(CacheKey cacheKey) {
this.cacheKey = cacheKey;
this.uniqueId = UUID.randomUUID().toString();
}

public CacheKey getCacheKey() {
return this.cacheKey;
}

public String getId() {
return uniqueId;
}
}

@Override
Expand Down
Loading

0 comments on commit e4583a4

Please sign in to comment.