Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce System Repository #9088

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))
- [Remote Store] Restrict user override for remote store index level settings ([#8812](https://github.com/opensearch-project/OpenSearch/pull/8812))
- Removed blocking wait in TransportGetSnapshotsAction which was exhausting generic threadpool ([#8377](https://github.com/opensearch-project/OpenSearch/pull/8377))
- [Remote Store] Introduce System Repository for Remote Store ([#9088](https://github.com/opensearch-project/OpenSearch/pull/9088/))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -388,6 +391,15 @@ protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
public List<Setting<?>> restrictedSystemRepositorySettings() {
List<Setting<?>> settings = new ArrayList<>();
List<Setting<?>> restrictedSystemRepositorySettings = super.restrictedSystemRepositorySettings();
settings.addAll(restrictedSystemRepositorySettings);
settings.addAll(new ArrayList<>(List.of(BUCKET_SETTING, BASE_PATH_SETTING)));
return Collections.unmodifiableList(settings);
}

@Override
protected void doClose() {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -132,6 +133,22 @@ public void testDefaultBufferSize() {
}
}

public void testCreateS3SystemRepository() {
final RepositoryMetadata metadata = new RepositoryMetadata(
"dummy-repo",
"mock",
Settings.builder()
.put(S3Repository.BASE_PATH_SETTING.getKey(), "foo/bar")
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
.put(S3Repository.SYSTEM_REPOSITORY_SETTING.getKey(), true)
.build()
);
try (S3Repository s3repo = createS3Repo(metadata)) {
assertTrue(s3repo.isSystemRepository());
assertThat(s3repo.restrictedSystemRepositorySettings(), hasSize(4));
}
}

private S3Repository createS3Repo(RepositoryMetadata metadata) {
return new S3Repository(
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.fs.FsRepository;
Expand All @@ -44,6 +45,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -108,4 +110,67 @@ public void testUpdateRepository() {
final Repository updatedRepository = repositoriesService.repository(repositoryName);
assertThat(updatedRepository, updated ? not(sameInstance(originalRepository)) : sameInstance(originalRepository));
}

public void testExternalSystemRepositoryCreationFails() {
final String repositoryName = "test-system-repo";
final Client client = client();

final Settings.Builder repoSettings = Settings.builder().put("location", randomRepoPath()).put("system_repository", true);

assertThrows(
"[" + repositoryName + "] cannot register a system repository externally",
RepositoryException.class,
() -> client.admin().cluster().preparePutRepository(repositoryName).setType(FsRepository.TYPE).setSettings(repoSettings).get()
);

assertThrows(RepositoryException.class, () -> client.admin().cluster().prepareGetRepositories(repositoryName).get());
}

public void testSystemRepositorySettingsUpdationFails() {
final InternalTestCluster cluster = internalCluster();
final String repositoryName = "test-repo";
final Client client = client();

final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class)
.iterator()
.next();
final Settings.Builder repoSettings = Settings.builder().put("location", randomRepoPath());

assertAcked(
client.admin().cluster().preparePutRepository(repositoryName).setType(FsRepository.TYPE).setSettings(repoSettings).get()
);

final Settings.Builder updatedRepoSettings = Settings.builder().put("location", randomRepoPath()).put("system_repository", true);

assertThrows(
"[" + repositoryName + "] trying to modify system repository attribute for a repository",
RepositoryException.class,
() -> client.admin()
.cluster()
.preparePutRepository(repositoryName)
.setType(FsRepository.TYPE)
.setSettings(updatedRepoSettings)
.get()
);
assertThrows(RepositoryException.class, () -> repositoriesService.getSystemRepository(repositoryName));
assertThat(repositoriesService.repository(repositoryName), instanceOf(FsRepository.class));
}

public void testRestrictedSystemSettings() {
final InternalTestCluster cluster = internalCluster();
final String repositoryName = "test-repo";
final Client client = client();

final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class)
.iterator()
.next();
final Settings.Builder repoSettings = Settings.builder().put("location", randomRepoPath());

assertAcked(
client.admin().cluster().preparePutRepository(repositoryName).setType(FsRepository.TYPE).setSettings(repoSettings).get()
);
List<Setting<?>> restrictedSettings = repositoriesService.repository(repositoryName).restrictedSystemRepositorySettings();
assertThat(restrictedSettings, hasSize(2));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void clusterManagerOperation(
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
repositoriesService.registerRepository(
repositoriesService.registerOrUpdateRepository(
request,
ActionListener.delegateFailure(
listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw
}

public Directory newDirectory(String repositoryName, String indexUUID, String shardId) throws IOException {
// TODO switch to using {@RepositoriesService#getSystemRespository} once repository auto-bootstrap changes are in
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public RemoteBlobStoreInternalTranslogFactory(
) {
Repository repository;
try {
// TODO switch to using {@RepositoriesService#getSystemRespository} once repository auto-bootstrap changes are in
repository = repositoriesServiceSupplier.get().repository(repositoryName);
} catch (RepositoryMissingException ex) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
return in.isReadOnly();
}

@Override
public boolean isSystemRepository() {
return in.isSystemRepository();

Check warning on line 162 in server/src/main/java/org/opensearch/repositories/FilterRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/FilterRepository.java#L162

Added line #L162 was not covered by tests
}

@Override
public void snapshotShard(
Store store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.stream.Stream;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

/**
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
Expand Down Expand Up @@ -159,7 +160,7 @@
* @param request register repository request
* @param listener register repository listener
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the name? No parameters were modified nor was the comment altered.

public void registerRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
public void registerOrUpdateRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we try to register a new system repository, do we call this method? or that flow would be separate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be a separate flow

assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";

final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
Expand Down Expand Up @@ -210,30 +211,41 @@
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories == null) {
logger.info("put repository [{}]", request.name());
ensureNonSystemRepository(request.settings(), request.name());
repositories = new RepositoriesMetadata(
Collections.singletonList(new RepositoryMetadata(request.name(), request.type(), request.settings()))
);
} else {
boolean found = false;
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

RepositoryMetadata currentRepositoryMetadata = null;
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) {
// Previous version is the same as this one no update is needed.
return currentState;
} else if (SYSTEM_REPOSITORY_SETTING.get(repositoryMetadata.settings()) != SYSTEM_REPOSITORY_SETTING.get(
newRepositoryMetadata.settings()
)) {
throw new RepositoryException(
repositoryMetadata.name(),

Check warning on line 231 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L230-L231

Added lines #L230 - L231 were not covered by tests
"trying to modify system repository attribute for a repository"
);
}
found = true;
currentRepositoryMetadata = repositoryMetadata;
repositoriesMetadata.add(newRepositoryMetadata);
} else {
repositoriesMetadata.add(repositoryMetadata);
}
}
if (!found) {
logger.info("put repository [{}]", request.name());
ensureNonSystemRepository(newRepositoryMetadata.settings(), newRepositoryMetadata.name());
Copy link
Collaborator

@gbbafna gbbafna Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this earlier where we also do validateRepositoryMetadataSettings ? We will anyways need this check here as we want this to be atomic .

repositoriesMetadata.add(new RepositoryMetadata(request.name(), request.type(), request.settings()));
} else {
logger.info("update repository [{}]", request.name());
validateRestrictedSystemRepositorySettingsAndType(currentRepositoryMetadata, newRepositoryMetadata);
}
repositories = new RepositoriesMetadata(repositoriesMetadata);
}
Expand Down Expand Up @@ -261,6 +273,29 @@
);
}

private void validateRestrictedSystemRepositorySettingsAndType(
RepositoryMetadata currentRepositoryMetadata,
RepositoryMetadata newRepositoryMetadata
) {
if (SYSTEM_REPOSITORY_SETTING.get(currentRepositoryMetadata.settings())) {
if (newRepositoryMetadata.type() != currentRepositoryMetadata.type()) {
throw new RepositoryException(
currentRepositoryMetadata.name(),
"trying to modify system repository type " + currentRepositoryMetadata.type()

Check warning on line 284 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L282-L284

Added lines #L282 - L284 were not covered by tests
);
}
Repository repository = repositories.get(currentRepositoryMetadata.name());

Check warning on line 287 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L287

Added line #L287 was not covered by tests
for (Setting<?> setting : repository.restrictedSystemRepositorySettings()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REMOTE_STORE_INDEX_SHALLOW_COPY setting will be in snapshot repository. for that, we need to add validation in non system repository as well. or should we make snapshot repository as well a system repository once user enables shallow snapshots on it?

if (newRepositoryMetadata.settings().get(setting.getKey()) != null) {
throw new RepositoryException(
repository.getMetadata().name(),
"trying to modify restricted system repository settings " + setting.getKey()

Check warning on line 292 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L290-L292

Added lines #L290 - L292 were not covered by tests
);
}
}

Check warning on line 295 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L295

Added line #L295 was not covered by tests
}
}

/**
* Unregisters repository in the cluster
* <p>
Expand Down Expand Up @@ -288,6 +323,7 @@
boolean changed = false;
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) {
ensureNonSystemRepository(repositoryMetadata.settings(), request.name());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message would be confusing here : cannot register a system repository externally . We need to modify it .

ensureRepositoryNotInUse(currentState, repositoryMetadata.name());
logger.info("delete repository [{}]", repositoryMetadata.name());
changed = true;
Expand Down Expand Up @@ -499,6 +535,21 @@
throw new RepositoryMissingException(repositoryName);
}

/**
* Returns registered system repository
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate API? Shouldn't the name still be unique and retrievable via Repository repository(String repositoryName) above?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to make sure we always use system repository for remote store .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design is inconsistent, the returned Repository exposed the 'isSystem' flag so this information is available to callers if they want to pick and choose.

* <p>*
* @param repositoryName repository name
* @return registered system repository
* @throws RepositoryMissingException if repository with such name isn't registered
*/
public Repository getSystemRepository(String repositoryName) {
Repository repository = repositories.get(repositoryName);

Check warning on line 546 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L546

Added line #L546 was not covered by tests
if (repository != null && SYSTEM_REPOSITORY_SETTING.get(repository.getMetadata().settings())) {
return repository;

Check warning on line 548 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L548

Added line #L548 was not covered by tests
}
throw new RepositoryMissingException(repositoryName);

Check warning on line 550 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L550

Added line #L550 was not covered by tests
}

public List<RepositoryStatsSnapshot> repositoriesStats() {
List<RepositoryStatsSnapshot> archivedRepoStats = repositoriesStatsArchive.getArchivedStats();
List<RepositoryStatsSnapshot> activeRepoStats = getRepositoryStatsForActiveRepositories();
Expand Down Expand Up @@ -610,6 +661,12 @@
}
}

public static void ensureNonSystemRepository(Settings repoSettings, String repository) {
if (SYSTEM_REPOSITORY_SETTING.get(repoSettings)) {
throw new RepositoryException(repository, "cannot register a system repository externally");

Check warning on line 666 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L666

Added line #L666 was not covered by tests
}
}

public static void validateRepositoryMetadataSettings(
ClusterService clusterService,
final String repositoryName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
Expand All @@ -55,6 +56,8 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -237,6 +240,13 @@
*/
boolean isReadOnly();

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would a callers use this field? It seems like it makes handling the system'ness of a responsible of the callers, rather than the responsibility of the implementer.

* Returns true if the repository is managed by the system directly and doesn't allow managing the lifetime of the
* repository through external APIs
* @return true if the repository is system managed
*/
boolean isSystemRepository();

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
Expand Down Expand Up @@ -341,6 +351,14 @@
throw new UnsupportedOperationException();
}

/**
* Returns the list of restricted system repository settings that cannot be mutated once repository is created*
* @return the list of settings
*/
default List<Setting<?>> restrictedSystemRepositorySettings() {
return Collections.emptyList();

Check warning on line 359 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L359

Added line #L359 was not covered by tests
}

/**
* Retrieve shard snapshot status for the stored snapshot
*
Expand Down
Loading
Loading