Skip to content

Commit

Permalink
Implemented support for configuring a cluster metrics monitor to call…
Browse files Browse the repository at this point in the history
… cat/indices, and cat/shards.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
  • Loading branch information
AWSHurneyt committed Jul 11, 2023
1 parent 5fc607b commit 274cd7d
Show file tree
Hide file tree
Showing 10 changed files with 1,706 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.toMap
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesRequestWrapper
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatShardsRequestWrapper
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.alerting.model.ClusterMetricsInput
Expand Down Expand Up @@ -84,12 +86,14 @@ class SupportedClusterMetricsSettings : org.opensearch.commons.alerting.settings
fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest {
val pathParams = clusterMetricsInput.parsePathParams()
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_INDICES -> CatIndicesRequestWrapper(pathParams)
ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest()
ClusterMetricType.CAT_RECOVERY -> {
if (pathParams.isEmpty()) return RecoveryRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return RecoveryRequest(*pathParamsArray)
}
ClusterMetricType.CAT_SHARDS -> CatShardsRequestWrapper(pathParams)
ClusterMetricType.CAT_SNAPSHOTS -> {
return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.opensearch.index.IndexNotFoundException
class IndexUtils {

companion object {
val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""")

const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util
package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.ActionResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
Expand All @@ -29,6 +29,8 @@ import org.opensearch.client.Client
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.support.XContentMapValues
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Calls the appropriate transport action for the API requested in the [clusterMetricsInput].
Expand All @@ -39,9 +41,23 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse = client.admin().cluster().health(request.clusterHealthRequest).get()
val indexSettingsResponse = client.admin().indices().getSettings(request.indexSettingsRequest).get()
val indicesResponse = client.admin().indices().stats(request.indicesStatsRequest).get()
val stateResponse = client.admin().cluster().state(request.clusterStateRequest).get()
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster()
.pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse = client.admin().cluster().state(request.clusterStateRequest).get()
val indicesResponse = client.admin().indices().stats(request.indicesStatsRequest).get()
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get()
Expand Down Expand Up @@ -74,6 +90,14 @@ fun ActionResponse.toMap(): Map<String, Any> {
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS.defaultPath)
)
is CatIndicesResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath)
)
is CatShardsResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_SHARDS.defaultPath)
)
is NodesStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.NODES_STATS.defaultPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"/_cat/indices": {},
"/_cat/pending_tasks": {},
"/_cat/recovery": {},
"/_cat/shards": {},
"/_cat/snapshots": {},
"/_cat/tasks": {},
"/_cluster/health": {},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.randomClusterMetricsInput
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesResponseWrapper.Companion.WRAPPER_FIELD
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.core.common.Strings
import org.opensearch.test.OpenSearchSingleNodeTestCase

class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
private val path = ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath

fun `test CatIndicesRequestWrapper validate valid pathParams`() {
// GIVEN
val pathParams = "index1,index-name-2,index-3"

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertEquals(3, requestWrapper.clusterHealthRequest.indices().size)
assertEquals(3, requestWrapper.clusterStateRequest.indices().size)
assertEquals(3, requestWrapper.indexSettingsRequest.indices().size)
assertEquals(3, requestWrapper.indicesStatsRequest.indices().size)
}

fun `test CatIndicesRequestWrapper validate without providing pathParams`() {
// GIVEN & WHEN
val requestWrapper = CatIndicesRequestWrapper()

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate blank pathParams`() {
// GIVEN
val pathParams = " "

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate empty pathParams`() {
// GIVEN
val pathParams = ""

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate invalid pathParams`() {
// GIVEN
val pathParams = "_index1,index^2"

// WHEN & THEN
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

/*
Creating a subset of indices to use for the pathParams to test that all indices on the cluster ARE NOT returned.
*/
val pathParamsIndices = testIndices.keys.toList().subList(1, testIndices.size - 1)
val pathParams = pathParamsIndices.joinToString(",")
val input = randomClusterMetricsInput(path = path, pathParams = pathParams)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(pathParamsIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
if (pathParamsIndices.contains(indexName)) {
assertEquals(indexName, returnedIndices[indexName]
?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String)
assertEquals(docCount.toString(), returnedIndices[indexName]
?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String)
}
}
}

fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

val input = randomClusterMetricsInput(path = path)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(testIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
assertEquals(indexName, returnedIndices[indexName]
?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String)
assertEquals(docCount.toString(), returnedIndices[indexName]
?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String)
}
}

private fun indexDoc(index: String, id: String, doc: String) {
client().prepareIndex(index).setId(id)
.setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get()
}
}
Loading

0 comments on commit 274cd7d

Please sign in to comment.