From 6459378824b03d730280669e26deb1ef6cbdcaf6 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Sat, 29 Jun 2024 12:48:03 +0530 Subject: [PATCH 1/2] Adding unfollow action in ism to invoke stop replication for ccr Signed-off-by: aggarwalShivani --- .gitignore | 3 +- build.gradle | 27 ++++- .../indexstatemanagement/ISMActionsParser.kt | 2 + .../action/UnfollowAction.kt | 32 +++++ .../action/UnfollowActionParser.kt | 30 +++++ .../step/unfollow/AttemptUnfollowStep.kt | 110 ++++++++++++++++++ .../validation/ActionValidation.kt | 1 + .../validation/ValidateUnfollow.kt | 71 +++++++++++ .../opensearchapi/OpenSearchExtensions.kt | 12 ++ .../mappings/opendistro-ism-config.json | 3 + .../action/UnfollowActionIT.kt | 58 +++++++++ .../step/AttemptUnfollowStepTests.kt | 91 +++++++++++++++ 12 files changed, 438 insertions(+), 2 deletions(-) create mode 100755 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowAction.kt create mode 100755 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionParser.kt create mode 100755 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/unfollow/AttemptUnfollowStep.kt create mode 100755 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateUnfollow.kt create mode 100755 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionIT.kt create mode 100755 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptUnfollowStepTests.kt diff --git a/.gitignore b/.gitignore index 3ea494d86..30f7ff387 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ src/test/resources/job-scheduler/ src/test/resources/bwc/ bin/ spi/bin/ -src/test/resources/notifications* \ No newline at end of file +src/test/resources/notifications* +src/test/resources/replication/ diff --git a/build.gradle b/build.gradle index 56e49f2ea..09b3433ae 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,9 @@ buildscript { kotlin_version = System.getProperty("kotlin.version", "1.8.21") security_plugin_version = System.getProperty("security.version", opensearch_build) + ccr_version = System.getProperty("ccr.version", opensearch_build) + ccr_build_download = 'http://localhost:8000/opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip' + ccr_resource_folder = "src/test/resources/replication" } repositories { @@ -205,7 +208,8 @@ dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' implementation "org.jetbrains:annotations:13.0" implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') - implementation "org.opensearch:common-utils:${common_utils_version}" + // implementation "org.opensearch:common-utils:${common_utils_version}" + implementation(files("libs/common-utils-3.0.0.0-SNAPSHOT.jar")) implementation "com.github.seancfoley:ipaddress:5.4.1" implementation "commons-codec:commons-codec:${versions.commonscodec}" implementation "org.apache.httpcomponents:httpclient:${versions.httpclient}" @@ -215,6 +219,7 @@ dependencies { testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" testImplementation "org.mockito:mockito-core:${versions.mockito}" + testImplementation "org.mockito:mockito-inline:4.11.0" add("ktlint", "com.pinterest.ktlint:ktlint-cli:1.1.0") { attributes { @@ -311,6 +316,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler") def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core") def notificationsFile = resolvePluginFile("notifications") def securityPluginFile = resolvePluginFile("opensearch-security") +def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication") ext.getPluginResource = { download_to_folder, download_from_src -> def src_split = download_from_src.split("/") @@ -391,6 +397,25 @@ testClusters.integTest { if (securityEnabled) { plugin(provider(securityPluginFile)) } + plugin(provider(ccrFile)) + /*plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$ccr_resource_folder").exists()) { + project.delete(files("$project.rootDir/$ccr_resource_folder")) + } + project.mkdir ccr_resource_folder + ant.get(src: ccr_build_download, + dest: ccr_resource_folder, + httpusecaches: false) + return fileTree(ccr_resource_folder).getSingleFile() + } + } + } + }))*/ setting 'path.repo', repo.absolutePath } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index a4c95b461..6cfea66cd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionPa import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.UnfollowActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() { ShrinkActionParser(), SnapshotActionParser(), TransformActionParser(), + UnfollowActionParser(), ) val customActionExtensionMap = mutableMapOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowAction.kt new file mode 100755 index 000000000..171ecc68a --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowAction.kt @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.indexmanagement.indexstatemanagement.step.unfollow.AttemptUnfollowStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext + +/** + * ISM action to stop replication on indices replicated on a follower cluster. + */ +class UnfollowAction( + index: Int, +) : Action(name, index) { + companion object { + const val name = "unfollow" + } + + private val attemptUnfollowStep = AttemptUnfollowStep() + + private val steps = listOf(attemptUnfollowStep) + + override fun getStepToExecute(context: StepContext): Step { + return attemptUnfollowStep + } + + override fun getSteps(): List = steps +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionParser.kt new file mode 100755 index 000000000..7cb757cab --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionParser.kt @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser + +class UnfollowActionParser : ActionParser() { + override fun fromStreamInput(sin: StreamInput): Action { + val index = sin.readInt() + return UnfollowAction(index) + } + + override fun fromXContent(xcp: XContentParser, index: Int): Action { + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + + return UnfollowAction(index) + } + + override fun getActionType(): String { + return UnfollowAction.name + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/unfollow/AttemptUnfollowStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/unfollow/AttemptUnfollowStep.kt new file mode 100755 index 000000000..c1985e7fa --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/unfollow/AttemptUnfollowStep.kt @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.unfollow + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.replication.ReplicationPluginInterface +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.transport.RemoteTransportException +class AttemptUnfollowStep : Step(name) { + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + try { + val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName) + /*val response: AcknowledgedResponse = + ReplicationPluginInterface.suspendUntil { + this.stopReplication( + context.client as NodeClient, + stopIndexReplicationRequestObj, + it, + ) + }*/ + val response = performStopAction(context.client as NodeClient, stopIndexReplicationRequestObj) + if (response.isAcknowledged) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(indexName)) + } else { + val message = getFailedMessage(indexName) + logger.warn(message) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(indexName, cause) + } else { + handleException(indexName, cause as Exception) + } + } catch (e: SnapshotInProgressException) { + handleSnapshotException(indexName, e) + } catch (e: Exception) { + handleException(indexName, e) + } + return this + } + + internal suspend fun performStopAction(client: NodeClient, request: StopIndexReplicationRequest): AcknowledgedResponse { + val response: AcknowledgedResponse = + ReplicationPluginInterface.suspendUntil { + this.stopReplication( + client, + request, + it, + ) + } + return response + } + private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.warn(message, e) + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to message) + } + + private fun handleException(indexName: String, e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info, + ) + } + + override fun isIdempotent() = true + + companion object { + const val name = "attempt_unfollow" + + fun getFailedMessage(index: String) = "Failed to unfollow index [index=$index]" + + fun getSuccessMessage(index: String) = "Successfully unfollowed index [index=$index]" + + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying unfollowing [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt index e26f2d32e..8ba1c439f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt @@ -33,6 +33,7 @@ class ActionValidation( "transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName) "close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName) "index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName) + "unfollow" -> ValidateUnfollow(settings, clusterService, jvmService).execute(indexName) // No validations for these actions at current stage. // Reason: https://github.com/opensearch-project/index-management/issues/587 "notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateUnfollow.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateUnfollow.kt new file mode 100755 index 000000000..cdfdfb5a1 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateUnfollow.kt @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.validation + +import org.apache.logging.log4j.LogManager +import org.opensearch.cluster.metadata.MetadataCreateIndexService +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate +import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.indices.InvalidIndexNameException +import org.opensearch.monitor.jvm.JvmService +@OpenForTesting +class ValidateUnfollow( + settings: Settings, + clusterService: ClusterService, + jvmService: JvmService, +) : Validate(settings, clusterService, jvmService) { + private val logger = LogManager.getLogger(javaClass) + + @Suppress("ReturnSuppressCount", "ReturnCount") + override fun execute(indexName: String): Validate { + // if these conditions are false, fail validation and do not execute unfollow action + if (!indexExists(indexName) || !validIndex(indexName)) { + validationStatus = ValidationStatus.FAILED + return this + } + validationMessage = getValidationPassedMessage(indexName) + return this + } + + private fun indexExists(indexName: String): Boolean { + val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName) + if (!isIndexExists) { + val message = getNoIndexMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + private fun validIndex(indexName: String): Boolean { + val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) } + // If the index name is invalid for any reason, this will throw an exception giving the reason why in the message. + // That will be displayed to the user as the cause. + try { + MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator) + } catch (e: Exception) { + val message = getIndexNotValidMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + @Suppress("TooManyFunctions") + companion object { + const val name = "validate_unfollow" + + fun getNoIndexMessage(index: String) = "No such index [index=$index] for unfollow action." + + fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort unfollow action on it." + + fun getValidationPassedMessage(index: String) = "Unfollow action validation passed for [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index a4464c343..e9e6f6ffa 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -29,6 +29,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.replication.ReplicationPluginInterface import org.opensearch.core.action.ActionListener import org.opensearch.core.action.support.DefaultShardOperationFailedException import org.opensearch.core.common.bytes.BytesReference @@ -259,6 +260,17 @@ suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPl ) } +suspend fun ReplicationPluginInterface.suspendUntil(block: ReplicationPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block( + object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }, + ) + } + fun Throwable.findRemoteTransportException(): RemoteTransportException? { if (this is RemoteTransportException) return this return this.cause?.findRemoteTransportException() diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 4c138a267..48a84698d 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -167,6 +167,9 @@ } } }, + "unfollow": { + "type": "object" + }, "delete": { "type": "object" }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionIT.kt new file mode 100755 index 000000000..9020881e3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/UnfollowActionIT.kt @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.waitFor +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class UnfollowActionIT : IndexStateManagementRestTestCase() { + private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) + + fun `test unfollow on a non-replicated index`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + val actionConfig = UnfollowAction(0) + val states = + listOf( + State("UnfollowState", listOf(actionConfig), listOf()), + ) + + val policy = + Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Need to wait two cycles. + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val metadataInfo = getExplainManagedIndexMetaData(indexName).info.toString() + assertTrue( + metadataInfo.contains("cause=No replication in progress for index:" + indexName), + ) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptUnfollowStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptUnfollowStepTests.kt new file mode 100755 index 000000000..f470a0dd3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptUnfollowStepTests.kt @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.spy +import kotlinx.coroutines.runBlocking +import org.mockito.Mockito +import org.mockito.Mockito.`when` +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.indexstatemanagement.step.unfollow.AttemptUnfollowStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase + +/*import com.nhaarman.mockitokotlin2.whenever +import org.opensearch.commons.replication.ReplicationPluginInterface +import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.opensearchapi.suspendUntil*/ +class AttemptUnfollowStepTests : OpenSearchTestCase() { + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val lockService: LockService = LockService(mock(), clusterService) + + fun `test unfollownew step sets step status to completed when successful`() { + val mockNodeClient = Mockito.mock(NodeClient::class.java) + // val request = StopIndexReplicationRequest("test") + val successfulResponse = AcknowledgedResponse(true) + /*Mockito.mockStatic(ReplicationPluginInterface::class.java).use { + whenever( + runBlocking { + ReplicationPluginInterface.suspendUntil( + Mockito.any(), + ) + }, + ).thenAnswer { invocation -> + val actionListener = invocation.getArgument>(0) + actionListener.onResponse(successfulResponse) + successfulResponse + }*/ + runBlocking { + // val attemptUnfollowStep = AttemptUnfollowStep() + val attemptUnfollowStep = spy(AttemptUnfollowStep()) + `when`(attemptUnfollowStep.performStopAction(any(), any())).thenReturn(successfulResponse) + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + mockNodeClient, + null, + null, + scriptService, + settings, + lockService, + ) + attemptUnfollowStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptUnfollowStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals( + "Step status is not COMPLETED", + Step.StepStatus.COMPLETED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + } + } +} From 310fd79fcc0cffafc5035bfb161983516d245be4 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Mon, 1 Jul 2024 20:31:32 +0530 Subject: [PATCH 2/2] Adding unfollow action in ism to invoke stop replication for ccr Signed-off-by: aggarwalShivani --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 09b3433ae..43fe19cc0 100644 --- a/build.gradle +++ b/build.gradle @@ -208,8 +208,8 @@ dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' implementation "org.jetbrains:annotations:13.0" implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') - // implementation "org.opensearch:common-utils:${common_utils_version}" - implementation(files("libs/common-utils-3.0.0.0-SNAPSHOT.jar")) + implementation "org.opensearch:common-utils:${common_utils_version}" + // implementation(files("libs/common-utils-3.0.0.0-SNAPSHOT.jar")) implementation "com.github.seancfoley:ipaddress:5.4.1" implementation "commons-codec:commons-codec:${versions.commonscodec}" implementation "org.apache.httpcomponents:httpclient:${versions.httpclient}"