diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index ea7ed3765..ef0d04393 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -250,11 +250,6 @@ object ManagedIndexRunner : @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) { logger.debug("Run job for index ${managedIndexConfig.index}") - // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls - if (clusterIsRed()) { - logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health") - return - } val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid) if (!getMetadataSuccess) { @@ -314,6 +309,15 @@ object ManagedIndexRunner : val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) + val allowRedCluster = state?.allowRedCluster + // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls + if (clusterIsRed()) { + if (allowRedCluster == false) { + logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health") + return + } + logger.warn("Warning: ISM is running on a red cluster") + } val stepContext = StepContext( managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index e34ff41fd..fb1c2ae18 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -25,9 +25,17 @@ import java.io.IOException data class State( val name: String, + val allowRedCluster: Boolean, val actions: List, val transitions: List, ) : ToXContentObject, Writeable { + + constructor( + name: String, + actions: List, + transitions: List, + ) : this(name, false, actions, transitions) + init { require(name.isNotBlank()) { "State must contain a valid name" } var hasDelete = false @@ -45,6 +53,7 @@ data class State( builder .startObject() .field(NAME_FIELD, name) + .field(ALLOW_RED_CLUSTER, allowRedCluster) .startArray(ACTIONS_FIELD) .also { actions.forEach { action -> action.toXContent(it, params) } } .endArray() @@ -56,6 +65,7 @@ data class State( @Throws(IOException::class) constructor(sin: StreamInput) : this( sin.readString(), + sin.readBoolean(), sin.readList { ISMActionsParser.instance.fromStreamInput(it) }, sin.readList(::Transition), ) @@ -63,6 +73,7 @@ data class State( @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(name) + out.writeBoolean(allowRedCluster) out.writeList(actions) out.writeList(transitions) } @@ -104,6 +115,7 @@ data class State( companion object { const val NAME_FIELD = "name" + const val ALLOW_RED_CLUSTER = "allow_red_cluster" const val ACTIONS_FIELD = "actions" const val TRANSITIONS_FIELD = "transitions" @@ -111,6 +123,7 @@ data class State( @Throws(IOException::class) fun parse(xcp: XContentParser): State { var name: String? = null + var allowRedCluster: Boolean = false val actions: MutableList = mutableListOf() val transitions: MutableList = mutableListOf() @@ -121,6 +134,7 @@ data class State( when (fieldName) { NAME_FIELD -> name = xcp.text() + ALLOW_RED_CLUSTER -> allowRedCluster = xcp.booleanValue() ACTIONS_FIELD -> { ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_ARRAY) { @@ -139,6 +153,7 @@ data class State( return State( name = requireNotNull(name) { "State name is null" }, + allowRedCluster = allowRedCluster, actions = actions.toList(), transitions = transitions.toList(), ) diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 4c138a267..a36b917e8 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -140,6 +140,9 @@ "name": { "type": "keyword" }, + "allow_red_cluster": { + "type": "boolean" + }, "actions": { "type": "nested", "properties": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 21f27f5af..cd8e7f072 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -941,6 +941,12 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() assertEquals("Unable to delete snapshot", RestStatus.OK, response.restStatus()) } + protected fun isClusterGreen(timeout: String) { + val endpoint = "_cluster/health?wait_for_status=yellow&timeout=$timeout" + val response = client().makeRequest("GET", endpoint) + assertEquals("Cluster status check timed out", RestStatus.OK, response.restStatus()) + } + @Suppress("UNCHECKED_CAST") protected fun assertSnapshotExists( repository: String, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 428ce810d..d0ad49182 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -5,6 +5,11 @@ package org.opensearch.indexmanagement.indexstatemanagement.runner +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.client.Request +import org.opensearch.client.Response +import org.opensearch.core.rest.RestStatus import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction @@ -224,4 +229,101 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { assertEquals("Failed to update ManagedIndexConfig jitter", newJitter, currJitter) } } + + fun `test runner on a red cluster with allow_red_cluster as false`() { + val indexName = "test-index-1" + val policyID = "test_red_cluster_policy" + val policy = + """ + {"policy":{"description":"Close indices older than 5m","default_state":"active","states":[{"name":"active","allow_red_cluster":"false", + "actions":[],"transitions":[{"state_name":"inactivestate","conditions":{"min_index_age":"5s"}}]},{"name":"inactivestate","allow_red_cluster":"false" + ,"actions":[{"delete":{}}],"transitions":[]}],"ism_template":{"index_patterns":["test-index"]}}} + """.trimIndent() + createPolicyJson(policy, policyID) + createIndex(indexName, policyID) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + val endpoint = "sim-red" + val jsonEntity = """{"settings":{"index.routing.allocation.require.size": "test"}}""" + val request = Request("PUT", endpoint) + request.entity = StringEntity(jsonEntity, ContentType.APPLICATION_JSON) + val response: Response = client().performRequest(request) + assertEquals("Failed to simulate red cluster", RestStatus.OK, response.restStatus()) + + // Change the start time so the job will trigger in 2 seconds. + // After the job, the index will be in "Active" State + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so the job will trigger in 2 seconds. + // Index Transitions to inactivestate state + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Wait for the index to settle in "inactivestate". + Thread.sleep(8000L) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + Thread.sleep(5000L) + + waitFor { assertIndexExists(indexName) } + + val deleteReq = Request("DELETE", endpoint) + val deleteRes: Response = client().performRequest(deleteReq) + assertEquals("Failed to delete Index $endpoint", RestStatus.OK, deleteRes.restStatus()) + isClusterGreen("30s") + } + + fun `test runner on a red cluster with allow_red_cluster as true`() { + val indexName = "test-index-2" + val policyID = "test_red_cluster_policy" + val policy = + """ + {"policy":{"description":"Close indices older than 5m","default_state":"active","states":[{"name":"active","allow_red_cluster":"true", + "actions":[],"transitions":[{"state_name":"inactivestate","conditions":{"min_index_age":"5s"}}]},{"name":"inactivestate","allow_red_cluster":"true" + ,"actions":[{"delete":{}}],"transitions":[]}],"ism_template":{"index_patterns":["test-index"]}}} + """.trimIndent() + createPolicyJson(policy, policyID) + createIndex(indexName, policyID) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + val endpoint = "sim-red" + val jsonEntity = """{"settings":{"index.routing.allocation.require.size": "test"}}""" + val request = Request("PUT", endpoint) + request.entity = StringEntity(jsonEntity, ContentType.APPLICATION_JSON) + val response: Response = client().performRequest(request) + assertEquals("Failed to simulate red cluster", RestStatus.OK, response.restStatus()) + + // Change the start time so the job will trigger in 2 seconds. + // After the job, the index will be in "Active" State + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so the job will trigger in 2 seconds. + // Index Transitions to inactivestate state + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Wait for the index to settle in "inactivestate". + Thread.sleep(8000L) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Wait for the index deletion by the ISM job. + Thread.sleep(5000L) + + waitFor { assertIndexDoesNotExist(indexName) } + + val deleteReq = Request("DELETE", endpoint) + val deleteRes: Response = client().performRequest(deleteReq) + assertEquals("Failed to delete Index $endpoint", RestStatus.OK, deleteRes.restStatus()) + isClusterGreen("30s") + } }