Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Delete stale index routing files. #14191

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -297,4 +298,15 @@ protected void doStart() {
@Override
protected void doStop() {}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
try {
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);
blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete some stale index routing paths from {}", stalePaths), e);
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ protected void doStop() {
protected void doClose() throws IOException {
// noop
}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting
List<String> indicesRoutingToDelete
);

public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand Down Expand Up @@ -72,8 +73,13 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private RemoteManifestManager remoteManifestManager;
private final RemoteRoutingTableService remoteRoutingTableService;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
public RemoteClusterStateCleanupManager(
RemoteClusterStateService remoteClusterStateService,
ClusterService clusterService,
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
Expand All @@ -83,6 +89,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
this.lastCleanupAttemptStateVersion = 0;
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
this.remoteRoutingTableService = remoteRoutingTableService;
}

void start() {
Expand Down Expand Up @@ -170,6 +177,7 @@ void deleteClusterMetadata(
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -192,6 +200,10 @@ void deleteClusterMetadata(
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
Expand Down Expand Up @@ -221,6 +233,19 @@ void deleteClusterMetadata(
.filter(file -> filesToKeep.contains(file) == false)
.forEach(staleGlobalMetadataPaths::add);
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
logger.debug(
() -> new ParameterizedMessage(
"Indices routing paths in stale manifest: {}",
uploadedIndicesRouting.getUploadedFilename()
)
);
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
Expand All @@ -238,6 +263,15 @@ void deleteClusterMetadata(
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
try {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IOException e) {
logger.error(
() -> new ParameterizedMessage("Error while deleting stale index routing files {}", staleIndexRoutingPaths),
e
);
remoteStateStats.indexRoutingFilesCleanupAttemptFailed();
}
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public RemoteClusterStateService(
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.remoteStateStats = new RemotePersistenceStats();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
*/
public class RemotePersistenceStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
static final String REMOTE_UPLOAD = "remote_upload";
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);

private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0);

public RemotePersistenceStats() {
super(REMOTE_UPLOAD);
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount);
}

public void cleanUpAttemptFailed() {
Expand All @@ -34,4 +38,12 @@ public void cleanUpAttemptFailed() {
public long getCleanupAttemptFailedCount() {
return cleanupAttemptFailedCount.get();
}

public void indexRoutingFilesCleanupAttemptFailed() {
indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet();
}

public long getIndexRoutingFilesCleanupAttemptFailedCount() {
return indexRoutingFilesCleanupAttemptFailedCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX;
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
Expand All @@ -65,6 +67,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -91,14 +94,12 @@ public void setup() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();

blobStoreRepository = mock(BlobStoreRepository.class);
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);

Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

Expand Down Expand Up @@ -552,4 +553,28 @@ private BlobPath getPath() {
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
);
}

public void testDeleteStaleIndexRoutingPaths() throws IOException {
doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any());
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
remoteRoutingTableService.doStart();
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException {
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
// Simulate an IOException
doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList());

remoteRoutingTableService.doStart();
IOException thrown = assertThrows(IOException.class, () -> {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
});
assertEquals("test exception", thrown.getMessage());
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

}
Loading
Loading