diff --git a/pom.xml b/pom.xml index 2200609..c2e9e2a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.instaclustr icarus - 1.0.5 + 1.0.6 instaclustr-icarus Sidecar for Apache Cassandra @@ -13,7 +13,7 @@ 1.3.13 - 1.0.4 + 1.0.5 6.14.3 2.23.4 diff --git a/src/main/java/com/instaclustr/icarus/Icarus.java b/src/main/java/com/instaclustr/icarus/Icarus.java index f2ea748..397568e 100644 --- a/src/main/java/com/instaclustr/icarus/Icarus.java +++ b/src/main/java/com/instaclustr/icarus/Icarus.java @@ -16,6 +16,8 @@ import com.instaclustr.esop.impl.backup.BackupModules; import com.instaclustr.esop.impl.backup.BackupModules.BackupModule; import com.instaclustr.esop.impl.backup.BackupModules.CommitlogBackupModule; +import com.instaclustr.esop.impl.hash.HashModule; +import com.instaclustr.esop.impl.hash.HashSpec; import com.instaclustr.esop.impl.restore.RestoreModules; import com.instaclustr.esop.impl.restore.RestoreModules.RestorationStrategyModule; import com.instaclustr.esop.impl.restore.RestoreModules.RestoreCommitlogModule; @@ -65,6 +67,9 @@ public final class Icarus extends CLIApplication implements Callable { @Mixin public CassandraJMXSpec jmxSpec; + @Mixin + public HashSpec hashSpec; + @Spec private CommandSpec commandSpec; @@ -94,7 +99,7 @@ public Void call() throws Exception { logCommandVersionInformation(commandSpec); // production binds singletons as eager by default - final Injector injector = createInjector(PRODUCTION, getModules(icarusSpec, jmxSpec, enableTruncateOperation)); + final Injector injector = createInjector(PRODUCTION, getModules(icarusSpec, jmxSpec, hashSpec, enableTruncateOperation)); return injector.getInstance(Application.class).call(); } @@ -104,12 +109,13 @@ public String getImplementationTitle() { return "instaclustr-icarus"; } - public List getModules(SidecarSpec icarusSpec, - CassandraJMXSpec jmxSpec, + public List getModules(final SidecarSpec icarusSpec, + final CassandraJMXSpec jmxSpec, + final HashSpec hashSpec, final boolean enableTruncateOperation) throws Exception { List modules = new ArrayList<>(); - modules.addAll(backupRestoreModules()); + modules.addAll(backupRestoreModules(hashSpec)); modules.addAll(operationModules()); modules.addAll(icarusModules(icarusSpec, jmxSpec)); @@ -154,7 +160,7 @@ protected void configure() { }}; } - public static List backupRestoreModules() { + public static List backupRestoreModules(final HashSpec hashSpec) { return new ArrayList() {{ add(new StorageModules()); add(new BackupModule()); @@ -164,6 +170,7 @@ public static List backupRestoreModules() { add(new RestorationStrategyModule()); add(new BackupModules.UploadingModule()); add(new RestoreModules.DownloadingModule()); + add(new HashModule(hashSpec)); }}; } diff --git a/src/main/java/com/instaclustr/icarus/coordination/IcarusBackupOperationCoordinator.java b/src/main/java/com/instaclustr/icarus/coordination/IcarusBackupOperationCoordinator.java index 763ac4d..2745efc 100644 --- a/src/main/java/com/instaclustr/icarus/coordination/IcarusBackupOperationCoordinator.java +++ b/src/main/java/com/instaclustr/icarus/coordination/IcarusBackupOperationCoordinator.java @@ -19,6 +19,7 @@ import com.instaclustr.cassandra.CassandraVersion; import com.instaclustr.esop.guice.BackuperFactory; import com.instaclustr.esop.guice.BucketServiceFactory; +import com.instaclustr.esop.impl.CassandraData; import com.instaclustr.esop.impl.KeyspaceTable; import com.instaclustr.esop.impl.StorageLocation; import com.instaclustr.esop.impl.backup.BackupOperation; @@ -26,6 +27,7 @@ import com.instaclustr.esop.impl.backup.Backuper; import com.instaclustr.esop.impl.backup.UploadTracker; import com.instaclustr.esop.impl.backup.coordination.BaseBackupOperationCoordinator; +import com.instaclustr.esop.impl.hash.HashSpec; import com.instaclustr.esop.topology.CassandraClusterTopology; import com.instaclustr.esop.topology.CassandraClusterTopology.ClusterTopology; import com.instaclustr.icarus.rest.IcarusClient; @@ -55,8 +57,9 @@ public IcarusBackupOperationCoordinator(final CassandraJMXService cassandraJMXSe final SidecarSpec icarusSpec, final ExecutorServiceSupplier executorServiceSupplier, final ObjectMapper objectMapper, - final UploadTracker uploadTracker) { - super(cassandraJMXService, cassandraVersionProvider, backuperFactoryMap, bucketServiceFactoryMap, objectMapper, uploadTracker); + final UploadTracker uploadTracker, + final HashSpec hashSpec) { + super(cassandraJMXService, cassandraVersionProvider, backuperFactoryMap, bucketServiceFactoryMap, objectMapper, uploadTracker, hashSpec); this.icarusSpec = icarusSpec; this.executorServiceSupplier = executorServiceSupplier; } @@ -70,7 +73,8 @@ public void coordinate(final Operation operation) { } try { - KeyspaceTable.checkEntitiesToProcess(operation.request.cassandraDirectory.resolve("data"), operation.request.entities); + CassandraData data = CassandraData.parse(operation.request.cassandraDirectory.resolve("data")); + data.setDatabaseEntitiesFromRequest(operation.request.entities); } catch (final Exception ex) { logger.error(ex.getMessage()); operation.addError(Operation.Error.from(ex)); diff --git a/src/main/java/com/instaclustr/icarus/coordination/IcarusRestoreOperationCoordinator.java b/src/main/java/com/instaclustr/icarus/coordination/IcarusRestoreOperationCoordinator.java index f0ff1e6..60cf1b2 100644 --- a/src/main/java/com/instaclustr/icarus/coordination/IcarusRestoreOperationCoordinator.java +++ b/src/main/java/com/instaclustr/icarus/coordination/IcarusRestoreOperationCoordinator.java @@ -29,6 +29,7 @@ import com.instaclustr.esop.guice.BucketServiceFactory; import com.instaclustr.esop.guice.RestorerFactory; import com.instaclustr.esop.impl.BucketService; +import com.instaclustr.esop.impl.CassandraData; import com.instaclustr.esop.impl.StorageLocation; import com.instaclustr.esop.impl.restore.RestorationPhase.RestorationPhaseType; import com.instaclustr.esop.impl.restore.RestorationStrategyResolver; @@ -89,8 +90,8 @@ public void coordinate(final Operation operation) throw /* * I receive a request - * If it is a global request, I will be coordinator - * otherwise just execute that request + * If it is a global request, I will be coordinator + * otherwise just execute that request */ // if it is not global request, there might be at most one global request running @@ -155,22 +156,39 @@ public void coordinate(final Operation operation) throw throw new IllegalStateException("ID of a running operation does not equal to ID of this restore operation!"); } - if (operation.request.restorationPhase != DOWNLOAD) { - throw new IllegalStateException(format("Restoration coordination has to start with %s phase.", DOWNLOAD)); + if (!operation.request.singlePhase && operation.request.restorationPhase != INIT) { + throw new IllegalStateException("If global request is true and singlePhase is false, restorationPhase has to be INIT"); + } + + try { + CassandraData data = CassandraData.parse(operation.request.cassandraDirectory.resolve("data")); + data.setDatabaseEntitiesFromRequest(operation.request.entities); + data.setRenamedEntitiesFromRequest(operation.request.rename); + } catch (final Exception ex) { + throw new IllegalStateException(ex.getMessage()); } try (final IcarusWrapper icarusWrapper = new IcarusWrapper(getSidecarClients())) { final IcarusWrapper oneClient = getOneClient(icarusWrapper); - final GlobalOperationProgressTracker progressTracker = new GlobalOperationProgressTracker(operation, numberOfOperations(icarusWrapper)); - - final ResultSupplier[] resultSuppliers = new ResultSupplier[]{ - () -> executePhase(new InitPhasePreparation(), operation, oneClient, progressTracker), - () -> executePhase(new DownloadPhasePreparation(), operation, icarusWrapper, progressTracker), - () -> executePhase(new TruncatePhasePreparation(), operation, oneClient, progressTracker), - () -> executePhase(new ImportingPhasePreparation(), operation, icarusWrapper, progressTracker), - () -> executePhase(new CleaningPhasePreparation(), operation, icarusWrapper, progressTracker), - }; + final GlobalOperationProgressTracker progressTracker = new GlobalOperationProgressTracker(operation, + numberOfOperations(icarusWrapper, operation.request.singlePhase)); + + ResultSupplier[] resultSuppliers; + + if (operation.request.singlePhase) { + resultSuppliers = new ResultSupplier[]{ + () -> executePhase(getPhase(operation.request.restorationPhase), operation, icarusWrapper, progressTracker) + }; + } else { + resultSuppliers = new ResultSupplier[]{ + () -> executePhase(new InitPhasePreparation(), operation, icarusWrapper, progressTracker), + () -> executePhase(new DownloadPhasePreparation(), operation, icarusWrapper, progressTracker), + () -> executePhase(new TruncatePhasePreparation(), operation, oneClient, progressTracker), + () -> executePhase(new ImportingPhasePreparation(), operation, icarusWrapper, progressTracker), + () -> executePhase(new CleaningPhasePreparation(), operation, icarusWrapper, progressTracker), + }; + } for (final ResultSupplier supplier : resultSuppliers) { supplier.getWithEx(); @@ -187,13 +205,13 @@ public void coordinate(final Operation operation) throw } } - private int numberOfOperations(final IcarusWrapper icarusWrapper) { - int operationsPerPhase = icarusWrapper.icarusClients.size(); - - // download, import and cleanup will be done on each node = num of clients * 3 - // 1 node for init phase + 1 node for truncate phase = 2 - - return operationsPerPhase * 3 + 2; + private int numberOfOperations(final IcarusWrapper icarusWrapper, final boolean singlePhase) { + final int numberOfClients = icarusWrapper.icarusClients.size(); + if (singlePhase) { + return numberOfClients; + } + // init, download, import and cleanup will be done on each node = num of clients * 4 + 1 node for truncate phase + return numberOfClients * 4 + 1; } public static class IcarusWrapper implements Closeable { @@ -273,6 +291,23 @@ void prepareBasics(final RestoreOperationRequest request, final IcarusClient cli } } + public static PhasePreparation getPhase(final RestorationPhaseType phaseType) { + switch (phaseType) { + case DOWNLOAD: + return new DownloadPhasePreparation(); + case TRUNCATE: + return new TruncatePhasePreparation(); + case IMPORT: + return new ImportingPhasePreparation(); + case CLEANUP: + return new CleaningPhasePreparation(); + case INIT: + return new InitPhasePreparation(); + default: + throw new IllegalStateException("Unable to resolve phase preparation from " + phaseType); + } + } + private static final class DownloadPhasePreparation extends PhasePreparation { @Override public RestorationPhaseType getPhaseType() { diff --git a/src/test/java/com/instaclustr/icarus/embedded/AbstractCassandraIcarusTest.java b/src/test/java/com/instaclustr/icarus/embedded/AbstractCassandraIcarusTest.java index 0177d1c..606c6ac 100644 --- a/src/test/java/com/instaclustr/icarus/embedded/AbstractCassandraIcarusTest.java +++ b/src/test/java/com/instaclustr/icarus/embedded/AbstractCassandraIcarusTest.java @@ -24,6 +24,7 @@ import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; +import com.instaclustr.esop.impl.hash.HashSpec; import com.instaclustr.icarus.Icarus; import com.instaclustr.icarus.rest.IcarusClient; import com.instaclustr.io.FileUtils; @@ -192,7 +193,9 @@ protected IcarusHolder sidecar(String jmxAddress, Integer jmxPort, String httpAd icarusSpec.httpServerAddress = new HttpServerInetSocketAddressTypeConverter().convert(httpAdddress + ":" + httpPort.toString()); icarusSpec.operationsExpirationPeriod = new Time(1L, HOURS); - List modules = new Icarus().getModules(icarusSpec, cassandraJMXSpec, true); + HashSpec hashSpec = new HashSpec(); + + List modules = new Icarus().getModules(icarusSpec, cassandraJMXSpec, hashSpec, true); Injector injector = Guice.createInjector(modules); diff --git a/src/test/java/com/instaclustr/icarus/embedded/singlenode/AbstractSingleNodeBackupFromScratchRestoreTest.java b/src/test/java/com/instaclustr/icarus/embedded/singlenode/AbstractSingleNodeBackupFromScratchRestoreTest.java index bafcf70..de49a45 100644 --- a/src/test/java/com/instaclustr/icarus/embedded/singlenode/AbstractSingleNodeBackupFromScratchRestoreTest.java +++ b/src/test/java/com/instaclustr/icarus/embedded/singlenode/AbstractSingleNodeBackupFromScratchRestoreTest.java @@ -1,5 +1,6 @@ package com.instaclustr.icarus.embedded.singlenode; +import static com.instaclustr.esop.impl.restore.RestorationPhase.RestorationPhaseType.INIT; import static com.instaclustr.esop.impl.restore.RestorationStrategy.RestorationStrategyType.IN_PLACE; import static com.instaclustr.io.FileUtils.createDirectory; import static com.instaclustr.io.FileUtils.deleteDirectory; @@ -11,7 +12,9 @@ import com.instaclustr.esop.impl.StorageLocation; import com.instaclustr.esop.impl.backup.BackupOperationRequest; +import com.instaclustr.esop.impl.restore.RestorationPhase; import com.instaclustr.esop.impl.restore.RestoreOperationRequest; +import com.instaclustr.esop.impl.retry.RetrySpec; import com.instaclustr.esop.topology.CassandraClusterTopology; import com.instaclustr.esop.topology.CassandraClusterTopology.ClusterTopology; import com.instaclustr.icarus.embedded.AbstractCassandraIcarusTest; @@ -85,7 +88,8 @@ private BackupOperationRequest createBackupRequest(final String cloud, false, // skip bucket verification null, // schema version false, // topology file, even it is false, global request does not care, it will upload it anyway - null // proxy settings + null, // proxy settings + new RetrySpec(10, RetrySpec.RetryStrategy.EXPONENTIAL, 3, true) // retry ); } @@ -132,7 +136,10 @@ private RestoreOperationRequest createRestoreOperationRequest(final String cloud false, // insecure false, // new cluster false, // skip bucket verification - null // proxy settings + null, // proxy settings + null, // rename + new RetrySpec(10, RetrySpec.RetryStrategy.EXPONENTIAL, 3, true), // retry + false ); } diff --git a/src/test/java/com/instaclustr/icarus/embedded/singlenode/BackupRestoreOperationTest.java b/src/test/java/com/instaclustr/icarus/embedded/singlenode/BackupRestoreOperationTest.java index da7ea27..0a14bd3 100644 --- a/src/test/java/com/instaclustr/icarus/embedded/singlenode/BackupRestoreOperationTest.java +++ b/src/test/java/com/instaclustr/icarus/embedded/singlenode/BackupRestoreOperationTest.java @@ -49,7 +49,8 @@ public void backupTest() throws Exception { false, // skip bucket verification null, // schemaVersion, false, // upload topology - null // proxy + null, // proxy + null // retry ); final OperationResult result = icarusHolder.icarusClient.backup(backupOperationRequest); @@ -105,7 +106,10 @@ public void backupTest() throws Exception { false, // insecure false, // newCluster false, // skipBucketVerification - null // proxy + null, // proxy + null, // rename + null, // retry + false // single ); icarusHolder.icarusClient.waitForCompleted(icarusHolder.icarusClient.restore(restoreOperationRequest)); diff --git a/src/test/java/com/instaclustr/icarus/embedded/singlenode/SingleNodeDistributedBackupRestoreTest.java b/src/test/java/com/instaclustr/icarus/embedded/singlenode/SingleNodeDistributedBackupRestoreTest.java index 4d47771..0dafbb2 100644 --- a/src/test/java/com/instaclustr/icarus/embedded/singlenode/SingleNodeDistributedBackupRestoreTest.java +++ b/src/test/java/com/instaclustr/icarus/embedded/singlenode/SingleNodeDistributedBackupRestoreTest.java @@ -1,6 +1,6 @@ package com.instaclustr.icarus.embedded.singlenode; -import static com.instaclustr.esop.impl.restore.RestorationPhase.RestorationPhaseType.DOWNLOAD; +import static com.instaclustr.esop.impl.restore.RestorationPhase.RestorationPhaseType.INIT; import static com.instaclustr.esop.impl.restore.RestorationStrategy.RestorationStrategyType.HARDLINKS; import static com.instaclustr.io.FileUtils.deleteDirectory; @@ -15,9 +15,9 @@ import com.instaclustr.esop.impl.backup.BackupOperationRequest; import com.instaclustr.esop.impl.restore.RestoreOperationRequest; import com.instaclustr.esop.impl.truncate.TruncateOperationRequest; +import com.instaclustr.icarus.embedded.AbstractCassandraIcarusTest; import com.instaclustr.icarus.rest.IcarusClient; import com.instaclustr.measure.DataRate; -import com.instaclustr.icarus.embedded.AbstractCassandraIcarusTest; import org.testng.annotations.Test; /** @@ -47,7 +47,8 @@ private BackupOperationRequest createBackupRequest(String snapshotName) { false, // skip bucket verification null, // schema version false, // topology file, even it is false, global request does not care, it will upload it anyway - null // proxy + null, // proxy + null // retry ); } @@ -64,7 +65,7 @@ private RestoreOperationRequest createRestoreOperationRequest(String schemaVersi DatabaseEntities.parse(keyspaceName), // entities false, // update cassandra yaml HARDLINKS, // restoration strategy - DOWNLOAD, // restoration phase + INIT, // restoration phase new ImportOperationRequest(null, null, downloadDir), // import false, // noDeleteTruncates false, // noDeleteDownload @@ -79,7 +80,10 @@ private RestoreOperationRequest createRestoreOperationRequest(String schemaVersi false, // insecure false, // newCluster false, // skip bucket verification - null // proxy + null, // proxy + null, // rename + null, + false ); }