Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding unfollow action in ism to invoke stop replication for ccr #1198

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

aggarwalShivani
Copy link
Contributor

Issue #, if available: #726

This is to add support for unfollow feature in ism.
It depends on two PRs already raised and under-review in common-utils project as well as CCR project. Detailed information about the proposed solution is explained there.

Description of changes:

  1. Imported replication utilities from common-utils project
  2. Added a new action "unfollow" for ism policies. It invokes the stop-replication utility from common-utils.
  3. Added one integration test - (with installation of ccr plugin in testClusters.integTest )

CheckList:

  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>
@aggarwalShivani
Copy link
Contributor Author

Hi,

  1. As the other prior two PRs in common-utils and ccr are not yet merged, the checks on CI would not go through. Also, in build.gradle, locally placed artifacts for these projects are being used. I shall remove those changes once the artifacts are merged and available in repo.
  2. I've also added a UT (AttemptUnfollowStepsTest.kt) - but that is WIP as its not working correctly, as I've discussed with @bowenlan-amzn. I've tried two approaches to mock the response value, by using mockStatic (to mock ReplicationInterface) and mock spy (to mock AttemptUnfollowStep.performStopAction() alone) but havent been successful.
    Would like to hear if there any suggestions in this area.

Looking forward for the review, Thanks!

@aggarwalShivani
Copy link
Contributor Author

Hi Reviewers,
As suggested by @bowenlan-amzn, I'm sharing some details on how one can bring up the environment with ccr and ism for this use-case. I've done this for a linux environment.

Environment setup details

1. Setup Opensearch

$ git clone https://github.com/opensearch-project/OpenSearch.git 
$ cd Opensearch
### Ensure JAVA_HOME, PATH are set. 
$ ./gradlew assemble
$ export OPENSEARCH_HOME=<opensearch-home-dir>   ## where you wish to setup Opensearch
$ export OPENSEARCH_BUILD=distribution/archives/linux-tar/build/install/opensearch-$(./gradlew properties -q | grep -E '^version:' | awk '{print $2}')
$ cp -Rf $OPENSEARCH_BUILD/* $OPENSEARCH_HOME

(Above steps can be avoided if you already have opensearch setup)
Installation of plugins:

 $ cd $OPENSEARCH_HOME
 $ ./bin/opensearch-plugin install <location-of-opensearch-job-scheduler-3.0.0.0-SNAPSHOT.zip>
 $ ./bin/opensearch-plugin install <location-of-opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip>
 $ ./bin/opensearch-plugin install <location-of-opensearch-index-management-3.0.0.0-SNAPSHOT.zip>

For ex. bin/opensearch-plugin install file:///home/username/ccr/opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip

2. Setup Configs
We need to bring up one cluster as the leader and other as the follower (that replicates the leader indices).
This small snippet is a function to generate opensearch.yml config files for the two clusters. Run this from $OPENSEARCH_HOME location.

configure_opensearch() {
	 role=$1
	 mkdir -p ccrtest/$role/data/  mkdir -p ccrtest/$role/logs
	 cp config/opensearch.yml config/opensearch-$role.yml
	 echo "
	 plugins.security.disabled: true
	 cluster.name: ${role}Cluster
	 path.data: $OPENSEARCH_HOME/ccrtest/${role}/data
	 path.logs: $OPENSEARCH_HOME/ccrtest/{role}/logs
	
	 " >> config/opensearch-{role}.yml
}
	
// Invoke the function twice. It generates files opensearch-leader.yml and  opensearch-follower.yml at $OPENSEARCH_HOME/config.
configure_opensearch "leader"
configure_opensearch "follower"

(This is using the default opensearch.yml file and just adding minimal required configs for CCR)

  1. Run Opensearch clusters
    Leader cluster as 127.0.0.1:9200, follower as 127.0.0.1:9201 using the generated config files respectively in step 2.

  2. Start Replication using CCR REST APIs
    Create an index in leader cluster, and setup replication in follower cluster.

  3. Start ISM
    Create an ism policy to run the action "unfollow" on the desired indices.

For steps 3-5, I've added the steps in a shell script for convenience - test-unfollow-script.txt. (Changed the extension to .txt as .sh files cannot be uploaded here)

I hope that helps : )

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>
@bowenlan-amzn bowenlan-amzn self-assigned this Jul 6, 2024
it,
)
}*/
val response = performStopAction(context.client as NodeClient, stopIndexReplicationRequestObj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the CCR doc and see if we stop relication, we won't be able to resume anymore.
In ISM, probably the normal workflow is after the leader index rollover, we can then stop the replication

I'm wondering how do we know that so to prevent early stopping the replication.

not requiring this since probably we can just mention this caveat in the documentation, and give a long waiting time before stop relication in the follower cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, replication cannot be resumed on an index after stopping (or pausing more than 12h).

Hmm, the major use-case we had identified in ISM+CCR case was - say ISM is setup in both leader and follower clusters, for deletion and other housekeeping operations in the respective clusters.
In the follower cluster, even if the user has setup a policy and intends to delete some-pattern* indices, it would not be allowed as they would be still read-only due to ongoing replication, which needs to be stopped first.

So in such cases, users could chain the actions to be preceeded by stop-replication first (before any other write actions).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood.

I'm just a little worried about one part: the follower index cannot know whether leader index finishes indexing, so it may stop relication early unexpectedly

but this can be waited for community feedback whether it's needed

@@ -206,6 +209,7 @@ dependencies {
implementation "org.jetbrains:annotations:13.0"
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
implementation "org.opensearch:common-utils:${common_utils_version}"
// implementation(files("libs/common-utils-3.0.0.0-SNAPSHOT.jar"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

/**
* ISM action to stop replication on indices replicated on a follower cluster.
*/
class UnfollowAction(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to use stop relication, which is the name of the API we have https://opensearch.org/docs/latest/tuning-your-cluster/replication-plugin/api/#stop-replication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, do you mean the name of the action in ism to be renamed? for ex. {"policy":{"actions":[{"stop_replication":{}}], .., "transitions":[]}}

Sure 👍 With that, all the files could be renamed as StopReplicationAction.kt, AttemptStopReplicationStep.kt, ValidateStopReplication.kt etc. Shall I go ahead with these names?

However, it would be tricky 🤔 to name the classes in ccr now - as the action-name indices:admin/plugins/replication/index/unfollow and TransportUnfollowIndexReplicationAction need to be renamed accordingly, to distinguish them from the existing stop REST API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure 👍 With that, all the files could be renamed as StopReplicationAction.kt, AttemptStopReplicationStep.kt, ValidateStopReplication.kt etc. Shall I go ahead with these names?

yes

However, it would be tricky 🤔 to name the classes in ccr now - as the action-name indices:admin/plugins/replication/index/unfollow and TransportUnfollowIndexReplicationAction need to be renamed accordingly, to distinguish them from the existing stop REST API.

may be InternalTransportStopIndexReplicationAction 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be InternalTransportStopIndexReplicationAction 😅

Yes, this also aligns with the action name we've decided "indices:internal/plugins/replication/index/stop". Thank you so much 🙂 I'll make these changes in subsequent pushes..

Comment on lines +30 to +37
/*val response: AcknowledgedResponse =
ReplicationPluginInterface.suspendUntil {
this.stopReplication(
context.client as NodeClient,
stopIndexReplicationRequestObj,
it,
)
}*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure 👍

I had added this to get some suggestions on writing mock-uts for AttemptUnfollowStep.kt as I have tried with two different approaches using a function performStopAction() and directly invoking ReplicationPluginInterface.suspendUntil, but to no success.

Do you have any suggestions on either partially mocking AttempUnfollowStep or mocking ReplicationPluginInterface.stopReplication in this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering did you try mock the client behavior like this, I suppose it would be pretty similar since both return acknowledge response

private fun getIndicesAdminClient(
rolloverResponse: RolloverResponse?,
aliasResponse: AcknowledgedResponse?,
rolloverException: Exception?,
aliasException: Exception?,
): IndicesAdminClient {
assertTrue(
"Must provide one and only one response or exception",
(rolloverResponse != null).xor(rolloverException != null),
)
assertTrue(
"Must provide one and only one response or exception",
(aliasResponse != null).xor(aliasException != null),
)
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (rolloverResponse != null) {
listener.onResponse(rolloverResponse)
} else {
listener.onFailure(rolloverException)
}
}.whenever(this.mock).rolloverIndex(any(), any())
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (aliasResponse != null) {
listener.onResponse(aliasResponse)
} else {
listener.onFailure(aliasException)
}
}.whenever(this.mock).aliases(any(), any())
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,
Yes I did try this. But there is a slight difference in these two classes, which is the trouble.

AttemptRolloverStep executes the rollover like this

val response: RolloverResponse = context.client.admin().indices().suspendUntil { rolloverIndex(request, it) }

In AttemptRolloverStepTests, context.client has been mocked in a way that when the rolloverIndex() is invoked by the test, it would return the value as directed by the mocked function.

But in the AttemptUnfollowStep.kt class, the execute() method invokes it like this

val response: AcknowledgedResponse =
                ReplicationPluginInterface.suspendUntil {
                    this.stopReplication(
                        context.client as NodeClient,
                        stopIndexReplicationRequestObj,
                        it,
                    )
                }

Here, I can mock the context.client and other params, but the test needs to mock ReplicationPluginInterface and ReplicationPluginInterface.suspendUntil to ultimately mock the response of stopReplication() function. If that does not happen, it would run the actual implementation and not the mocked one - which does not work in the UT.
I'm unable to mock ReplicationPluginInterface as it is static. Tried another approach using mock spy (to mock AttemptUnfollowStep.performStopAction() partially), and that hasn't worked either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bowenlan-amzn , Any thoughts here?

@@ -66,6 +66,9 @@ buildscript {
kotlin_version = System.getProperty("kotlin.version", "1.8.21")

security_plugin_version = System.getProperty("security.version", opensearch_build)
ccr_version = System.getProperty("ccr.version", opensearch_build)
ccr_build_download = 'http://localhost:8000/opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have ccr zip published https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/opensearch-cross-cluster-replication/3.0.0.0-SNAPSHOT/

can you follow this part

// https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/
opensearchPlugin "org.opensearch.plugin:opensearch-job-scheduler:${job_scheduler_version}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip"
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
to pull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thank you, I have followed this and will push this in the next commit.

ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +  '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip'
...
dependencies {
    opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip" 
}
...
def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication")
testClusters.integTest {
plugin(provider(ccrFile))

However, for now, as my changes in ccr repo are still not merged, I cannot test the integration with the zip taken from repo, and am using the local zip itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants