From 274cd7ddb1c4dd0ca4d6a9d2ad5907d10e504d3b Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 10 Jul 2023 19:47:00 -0700 Subject: [PATCH 1/5] Implemented support for configuring a cluster metrics monitor to call cat/indices, and cat/shards. Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/InputService.kt | 4 +- .../SupportedClusterMetricsSettings.kt | 4 + .../opensearch/alerting/util/IndexUtils.kt | 2 + .../CatIndicesHelpers.kt | 857 ++++++++++++++++++ .../CatShardsHelpers.kt | 491 ++++++++++ ...pportedClusterMetricsSettingsExtensions.kt | 26 +- .../settings/supported_json_payloads.json | 2 + .../CatIndicesWrappersIT.kt | 165 ++++ .../CatShardsWrappersIT.kt | 157 ++++ ...edClusterMetricsSettingsExtensionsTests.kt | 2 +- 10 files changed, 1706 insertions(+), 4 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt rename alerting/src/main/kotlin/org/opensearch/alerting/util/{ => clusterMetricsMonitorHelpers}/SupportedClusterMetricsSettingsExtensions.kt (81%) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt rename alerting/src/test/kotlin/org/opensearch/alerting/util/{ => clusterMetricsMonitorHelpers}/SupportedClusterMetricsSettingsExtensionsTests.kt (98%) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 8c22816dc..656affc84 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -14,9 +14,9 @@ import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.addUserBackendRolesFilter -import org.opensearch.alerting.util.executeTransportAction +import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction +import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap import org.opensearch.alerting.util.getRoleFilterEnabled -import org.opensearch.alerting.util.toMap import org.opensearch.alerting.util.use import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt index c2e7b27a0..f71051ea2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt @@ -14,6 +14,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest import org.opensearch.action.admin.indices.recovery.RecoveryRequest +import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesRequestWrapper +import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatShardsRequestWrapper import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.model.ClusterMetricsInput @@ -84,12 +86,14 @@ class SupportedClusterMetricsSettings : org.opensearch.commons.alerting.settings fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest { val pathParams = clusterMetricsInput.parsePathParams() return when (clusterMetricsInput.clusterMetricType) { + ClusterMetricType.CAT_INDICES -> CatIndicesRequestWrapper(pathParams) ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest() ClusterMetricType.CAT_RECOVERY -> { if (pathParams.isEmpty()) return RecoveryRequest() val pathParamsArray = pathParams.split(",").toTypedArray() return RecoveryRequest(*pathParamsArray) } + ClusterMetricType.CAT_SHARDS -> CatShardsRequestWrapper(pathParams) ClusterMetricType.CAT_SNAPSHOTS -> { return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS)) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 27b6fa9fa..1b75f523d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -26,6 +26,8 @@ import org.opensearch.index.IndexNotFoundException class IndexUtils { companion object { + val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""") + const val _META = "_meta" const val SCHEMA_VERSION = "schema_version" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt new file mode 100644 index 000000000..d3447ed99 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt @@ -0,0 +1,857 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.clusterMetricsMonitorHelpers + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ActionResponse +import org.opensearch.action.ValidateActions +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse +import org.opensearch.action.admin.indices.stats.CommonStats +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.action.support.IndicesOptions +import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.time.DateFormatter +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.IndexSettings +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.util.Locale + +class CatIndicesRequestWrapper(val pathParams: String = "") : ActionRequest() { + val log = LogManager.getLogger(CatIndicesRequestWrapper::class.java) + + var clusterHealthRequest: ClusterHealthRequest = + ClusterHealthRequest().indicesOptions(IndicesOptions.lenientExpandHidden()) + var clusterStateRequest: ClusterStateRequest = + ClusterStateRequest().indicesOptions(IndicesOptions.lenientExpandHidden()) + var indexSettingsRequest: GetSettingsRequest = + GetSettingsRequest() + .indicesOptions(IndicesOptions.lenientExpandHidden()) + .names(IndexSettings.INDEX_SEARCH_THROTTLED.key) + var indicesStatsRequest: IndicesStatsRequest = + IndicesStatsRequest().all().indicesOptions(IndicesOptions.lenientExpandHidden()) + var indicesList = arrayOf() + + init { + if (pathParams.isNotBlank()) { + indicesList = pathParams.split(",").toTypedArray() + + require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." } + + clusterHealthRequest = clusterHealthRequest.indices(*indicesList) + clusterStateRequest = clusterStateRequest.indices(*indicesList) + indexSettingsRequest = indexSettingsRequest.indices(*indicesList) + indicesStatsRequest = indicesStatsRequest.indices(*indicesList) + } + } + + override fun validate(): ActionRequestValidationException? { + var exception: ActionRequestValidationException? = null + if (pathParams.isNotBlank() && indicesList.any { !VALID_INDEX_NAME_REGEX.containsMatchIn(it) }) + exception = ValidateActions.addValidationError( + "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases.", + exception + ) + return exception + } +} + +class CatIndicesResponseWrapper( + clusterHealthResponse: ClusterHealthResponse, + clusterStateResponse: ClusterStateResponse, + indexSettingsResponse: GetSettingsResponse, + indicesStatsResponse: IndicesStatsResponse +) : ActionResponse(), ToXContentObject { + var indexInfoList: List = listOf() + + init { + indexInfoList = compileIndexInfo( + clusterHealthResponse, + clusterStateResponse, + indexSettingsResponse, + indicesStatsResponse + ) + } + + companion object { + const val WRAPPER_FIELD = "indices" + } + + override fun writeTo(out: StreamOutput) { + out.writeList(indexInfoList) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.startArray(WRAPPER_FIELD) + indexInfoList.forEach { it.toXContent(builder, params) } + builder.endArray() + return builder.endObject() + } + + private fun compileIndexInfo( + clusterHealthResponse: ClusterHealthResponse, + clusterStateResponse: ClusterStateResponse, + indexSettingsResponse: GetSettingsResponse, + indicesStatsResponse: IndicesStatsResponse + ): List { + val list = mutableListOf() + + val indicesSettings = indexSettingsResponse.indexToSettings + val indicesHealths = clusterHealthResponse.indices + val indicesStats = indicesStatsResponse.indices + val indicesMetadatas = hashMapOf() + clusterStateResponse.state.metadata.forEach { indicesMetadatas[it.index.name] = it } + + indicesSettings.forEach { (indexName, settings) -> + if (!indicesMetadatas.containsKey(indexName)) return@forEach + + val indexMetadata = indicesMetadatas[indexName] + val indexState = indexMetadata?.state + val indexStats = indicesStats[indexName] + val searchThrottled = IndexSettings.INDEX_SEARCH_THROTTLED.get(settings) + val indexHealth = indicesHealths[indexName] + + var health = "" + if (indexHealth != null) { + health = indexHealth.status.toString().lowercase(Locale.ROOT) + } else if (indexStats != null) { + health = "red*" + } + + val primaryStats: CommonStats? + val totalStats: CommonStats? + if (indexStats == null || indexState == IndexMetadata.State.CLOSE) { + primaryStats = CommonStats() + totalStats = CommonStats() + } else { + primaryStats = indexStats.primaries + totalStats = indexStats.total + } + + list.add( + IndexInfo( + health = health, + status = indexState.toString().lowercase(Locale.ROOT), + index = indexName, + uuid = indexMetadata?.indexUUID, + pri = "${indexHealth?.numberOfShards}", + rep = "${indexHealth?.numberOfReplicas}", + docsCount = "${primaryStats?.getDocs()?.count}", + docsDeleted = "${primaryStats?.getDocs()?.deleted}", + creationDate = "${indexMetadata?.creationDate}", + creationDateString = DateFormatter.forPattern("strict_date_time") + .format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(indexMetadata!!.creationDate), ZoneOffset.UTC)), + storeSize = "${totalStats?.store?.size}", + priStoreSize = "${primaryStats?.store?.size}", + completionSize = "${totalStats?.completion?.size}", + priCompletionSize = "${primaryStats?.completion?.size}", + fieldDataMemorySize = "${totalStats?.fieldData?.memorySize}", + priFieldDataMemorySize = "${primaryStats?.fieldData?.memorySize}", + fieldDataEvictions = "${totalStats?.fieldData?.evictions}", + priFieldDataEvictions = "${primaryStats?.fieldData?.evictions}", + queryCacheMemorySize = "${totalStats?.queryCache?.memorySize}", + priQueryCacheMemorySize = "${primaryStats?.queryCache?.memorySize}", + queryCacheEvictions = "${totalStats?.queryCache?.evictions}", + priQueryCacheEvictions = "${primaryStats?.queryCache?.evictions}", + requestCacheMemorySize = "${totalStats?.requestCache?.memorySize}", + priRequestCacheMemorySize = "${primaryStats?.requestCache?.memorySize}", + requestCacheEvictions = "${totalStats?.requestCache?.evictions}", + priRequestCacheEvictions = "${primaryStats?.requestCache?.evictions}", + requestCacheHitCount = "${totalStats?.requestCache?.hitCount}", + priRequestCacheHitCount = "${primaryStats?.requestCache?.hitCount}", + requestCacheMissCount = "${totalStats?.requestCache?.missCount}", + priRequestCacheMissCount = "${primaryStats?.requestCache?.missCount}", + flushTotal = "${totalStats?.flush?.total}", + priFlushTotal = "${primaryStats?.flush?.total}", + flushTotalTime = "${totalStats?.flush?.totalTime}", + priFlushTotalTime = "${primaryStats?.flush?.totalTime}", + getCurrent = "${totalStats?.get?.current()}", + priGetCurrent = "${primaryStats?.get?.current()}", + getTime = "${totalStats?.get?.time}", + priGetTime = "${primaryStats?.get?.time}", + getTotal = "${totalStats?.get?.count}", + priGetTotal = "${primaryStats?.get?.count}", + getExistsTime = "${totalStats?.get?.existsTime}", + priGetExistsTime = "${primaryStats?.get?.existsTime}", + getExistsTotal = "${totalStats?.get?.existsCount}", + priGetExistsTotal = "${primaryStats?.get?.existsCount}", + getMissingTime = "${totalStats?.get?.missingTime}", + priGetMissingTime = "${primaryStats?.get?.missingTime}", + getMissingTotal = "${totalStats?.get?.missingCount}", + priGetMissingTotal = "${primaryStats?.get?.missingCount}", + indexingDeleteCurrent = "${totalStats?.indexing?.total?.deleteCurrent}", + priIndexingDeleteCurrent = "${primaryStats?.indexing?.total?.deleteCurrent}", + indexingDeleteTime = "${totalStats?.indexing?.total?.deleteTime}", + priIndexingDeleteTime = "${primaryStats?.indexing?.total?.deleteTime}", + indexingDeleteTotal = "${totalStats?.indexing?.total?.deleteCount}", + priIndexingDeleteTotal = "${primaryStats?.indexing?.total?.deleteCount}", + indexingIndexCurrent = "${totalStats?.indexing?.total?.indexCurrent}", + priIndexingIndexCurrent = "${primaryStats?.indexing?.total?.indexCurrent}", + indexingIndexTime = "${totalStats?.indexing?.total?.indexTime}", + priIndexingIndexTime = "${primaryStats?.indexing?.total?.indexTime}", + indexingIndexTotal = "${totalStats?.indexing?.total?.indexCount}", + priIndexingIndexTotal = "${primaryStats?.indexing?.total?.indexCount}", + indexingIndexFailed = "${totalStats?.indexing?.total?.indexFailedCount}", + priIndexingIndexFailed = "${primaryStats?.indexing?.total?.indexFailedCount}", + mergesCurrent = "${totalStats?.merge?.current}", + priMergesCurrent = "${primaryStats?.merge?.current}", + mergesCurrentDocs = "${totalStats?.merge?.currentNumDocs}", + priMergesCurrentDocs = "${primaryStats?.merge?.currentNumDocs}", + mergesCurrentSize = "${totalStats?.merge?.currentSize}", + priMergesCurrentSize = "${primaryStats?.merge?.currentSize}", + mergesTotal = "${totalStats?.merge?.total}", + priMergesTotal = "${primaryStats?.merge?.total}", + mergesTotalDocs = "${totalStats?.merge?.totalNumDocs}", + priMergesTotalDocs = "${primaryStats?.merge?.totalNumDocs}", + mergesTotalSize = "${totalStats?.merge?.totalSize}", + priMergesTotalSize = "${primaryStats?.merge?.totalSize}", + mergesTotalTime = "${totalStats?.merge?.totalTime}", + priMergesTotalTime = "${primaryStats?.merge?.totalTime}", + refreshTotal = "${totalStats?.refresh?.total}", + priRefreshTotal = "${primaryStats?.refresh?.total}", + refreshTime = "${totalStats?.refresh?.totalTime}", + priRefreshTime = "${primaryStats?.refresh?.totalTime}", + refreshExternalTotal = "${totalStats?.refresh?.externalTotal}", + priRefreshExternalTotal = "${primaryStats?.refresh?.externalTotal}", + refreshExternalTime = "${totalStats?.refresh?.externalTotalTime}", + priRefreshExternalTime = "${primaryStats?.refresh?.externalTotalTime}", + refreshListeners = "${totalStats?.refresh?.listeners}", + priRefreshListeners = "${primaryStats?.refresh?.listeners}", + searchFetchCurrent = "${totalStats?.search?.total?.fetchCurrent}", + priSearchFetchCurrent = "${primaryStats?.search?.total?.fetchCurrent}", + searchFetchTime = "${totalStats?.search?.total?.fetchTime}", + priSearchFetchTime = "${primaryStats?.search?.total?.fetchTime}", + searchFetchTotal = "${totalStats?.search?.total?.fetchCount}", + priSearchFetchTotal = "${primaryStats?.search?.total?.fetchCount}", + searchOpenContexts = "${totalStats?.search?.openContexts}", + priSearchOpenContexts = "${primaryStats?.search?.openContexts}", + searchQueryCurrent = "${totalStats?.search?.total?.queryCurrent}", + priSearchQueryCurrent = "${primaryStats?.search?.total?.queryCurrent}", + searchQueryTime = "${totalStats?.search?.total?.queryTime}", + priSearchQueryTime = "${primaryStats?.search?.total?.queryTime}", + searchQueryTotal = "${totalStats?.search?.total?.queryCount}", + priSearchQueryTotal = "${primaryStats?.search?.total?.queryCount}", + searchScrollCurrent = "${totalStats?.search?.total?.scrollCurrent}", + priSearchScrollCurrent = "${primaryStats?.search?.total?.scrollCurrent}", + searchScrollTime = "${totalStats?.search?.total?.scrollTime}", + priSearchScrollTime = "${primaryStats?.search?.total?.scrollTime}", + searchScrollTotal = "${totalStats?.search?.total?.scrollCount}", + priSearchScrollTotal = "${primaryStats?.search?.total?.scrollCount}", + searchPointInTimeCurrent = "${totalStats?.search?.total?.pitCurrent}", + priSearchPointInTimeCurrent = "${primaryStats?.search?.total?.pitCurrent}", + searchPointInTimeTime = "${totalStats?.search?.total?.pitTime}", + priSearchPointInTimeTime = "${primaryStats?.search?.total?.pitTime}", + searchPointInTimeTotal = "${totalStats?.search?.total?.pitCount}", + priSearchPointInTimeTotal = "${primaryStats?.search?.total?.pitCount}", + segmentsCount = "${totalStats?.segments?.count}", + priSegmentsCount = "${primaryStats?.segments?.count}", + segmentsMemory = "${totalStats?.segments?.zeroMemory}", + priSegmentsMemory = "${primaryStats?.segments?.zeroMemory}", + segmentsIndexWriterMemory = "${totalStats?.segments?.indexWriterMemory}", + priSegmentsIndexWriterMemory = "${primaryStats?.segments?.indexWriterMemory}", + segmentsVersionMapMemory = "${totalStats?.segments?.versionMapMemory}", + priSegmentsVersionMapMemory = "${primaryStats?.segments?.versionMapMemory}", + segmentsFixedBitsetMemory = "${totalStats?.segments?.bitsetMemory}", + priSegmentsFixedBitsetMemory = "${primaryStats?.segments?.bitsetMemory}", + warmerCurrent = "${totalStats?.warmer?.current()}", + priWarmerCurrent = "${primaryStats?.warmer?.current()}", + warmerTotal = "${totalStats?.warmer?.total()}", + priWarmerTotal = "${primaryStats?.warmer?.total()}", + warmerTotalTime = "${totalStats?.warmer?.totalTime()}", + priWarmerTotalTime = "${primaryStats?.warmer?.totalTime()}", + suggestCurrent = "${totalStats?.search?.total?.suggestCurrent}", + priSuggestCurrent = "${primaryStats?.search?.total?.suggestCurrent}", + suggestTime = "${totalStats?.search?.total?.suggestTime}", + priSuggestTime = "${primaryStats?.search?.total?.suggestTime}", + suggestTotal = "${totalStats?.search?.total?.suggestCount}", + priSuggestTotal = "${primaryStats?.search?.total?.suggestCount}", + memoryTotal = "${totalStats?.totalMemory}", + priMemoryTotal = "${primaryStats?.totalMemory}", + searchThrottled = "$searchThrottled", + ) + ) + } + + return list + } + + data class IndexInfo( + val health: String?, + val status: String?, + val index: String?, + val uuid: String?, + val pri: String?, + val rep: String?, + val docsCount: String?, + val docsDeleted: String?, + val creationDate: String?, + val creationDateString: String?, + val storeSize: String?, + val priStoreSize: String?, + val completionSize: String?, + val priCompletionSize: String?, + val fieldDataMemorySize: String?, + val priFieldDataMemorySize: String?, + val fieldDataEvictions: String?, + val priFieldDataEvictions: String?, + val queryCacheMemorySize: String?, + val priQueryCacheMemorySize: String?, + val queryCacheEvictions: String?, + val priQueryCacheEvictions: String?, + val requestCacheMemorySize: String?, + val priRequestCacheMemorySize: String?, + val requestCacheEvictions: String?, + val priRequestCacheEvictions: String?, + val requestCacheHitCount: String?, + val priRequestCacheHitCount: String?, + val requestCacheMissCount: String?, + val priRequestCacheMissCount: String?, + val flushTotal: String?, + val priFlushTotal: String?, + val flushTotalTime: String?, + val priFlushTotalTime: String?, + val getCurrent: String?, + val priGetCurrent: String?, + val getTime: String?, + val priGetTime: String?, + val getTotal: String?, + val priGetTotal: String?, + val getExistsTime: String?, + val priGetExistsTime: String?, + val getExistsTotal: String?, + val priGetExistsTotal: String?, + val getMissingTime: String?, + val priGetMissingTime: String?, + val getMissingTotal: String?, + val priGetMissingTotal: String?, + val indexingDeleteCurrent: String?, + val priIndexingDeleteCurrent: String?, + val indexingDeleteTime: String?, + val priIndexingDeleteTime: String?, + val indexingDeleteTotal: String?, + val priIndexingDeleteTotal: String?, + val indexingIndexCurrent: String?, + val priIndexingIndexCurrent: String?, + val indexingIndexTime: String?, + val priIndexingIndexTime: String?, + val indexingIndexTotal: String?, + val priIndexingIndexTotal: String?, + val indexingIndexFailed: String?, + val priIndexingIndexFailed: String?, + val mergesCurrent: String?, + val priMergesCurrent: String?, + val mergesCurrentDocs: String?, + val priMergesCurrentDocs: String?, + val mergesCurrentSize: String?, + val priMergesCurrentSize: String?, + val mergesTotal: String?, + val priMergesTotal: String?, + val mergesTotalDocs: String?, + val priMergesTotalDocs: String?, + val mergesTotalSize: String?, + val priMergesTotalSize: String?, + val mergesTotalTime: String?, + val priMergesTotalTime: String?, + val refreshTotal: String?, + val priRefreshTotal: String?, + val refreshTime: String?, + val priRefreshTime: String?, + val refreshExternalTotal: String?, + val priRefreshExternalTotal: String?, + val refreshExternalTime: String?, + val priRefreshExternalTime: String?, + val refreshListeners: String?, + val priRefreshListeners: String?, + val searchFetchCurrent: String?, + val priSearchFetchCurrent: String?, + val searchFetchTime: String?, + val priSearchFetchTime: String?, + val searchFetchTotal: String?, + val priSearchFetchTotal: String?, + val searchOpenContexts: String?, + val priSearchOpenContexts: String?, + val searchQueryCurrent: String?, + val priSearchQueryCurrent: String?, + val searchQueryTime: String?, + val priSearchQueryTime: String?, + val searchQueryTotal: String?, + val priSearchQueryTotal: String?, + val searchScrollCurrent: String?, + val priSearchScrollCurrent: String?, + val searchScrollTime: String?, + val priSearchScrollTime: String?, + val searchScrollTotal: String?, + val priSearchScrollTotal: String?, + val searchPointInTimeCurrent: String?, + val priSearchPointInTimeCurrent: String?, + val searchPointInTimeTime: String?, + val priSearchPointInTimeTime: String?, + val searchPointInTimeTotal: String?, + val priSearchPointInTimeTotal: String?, + val segmentsCount: String?, + val priSegmentsCount: String?, + val segmentsMemory: String?, + val priSegmentsMemory: String?, + val segmentsIndexWriterMemory: String?, + val priSegmentsIndexWriterMemory: String?, + val segmentsVersionMapMemory: String?, + val priSegmentsVersionMapMemory: String?, + val segmentsFixedBitsetMemory: String?, + val priSegmentsFixedBitsetMemory: String?, + val warmerCurrent: String?, + val priWarmerCurrent: String?, + val warmerTotal: String?, + val priWarmerTotal: String?, + val warmerTotalTime: String?, + val priWarmerTotalTime: String?, + val suggestCurrent: String?, + val priSuggestCurrent: String?, + val suggestTime: String?, + val priSuggestTime: String?, + val suggestTotal: String?, + val priSuggestTotal: String?, + val memoryTotal: String?, + val priMemoryTotal: String?, + val searchThrottled: String? + ) : ToXContentObject, Writeable { + companion object { + const val HEALTH_FIELD = "health" + const val STATUS_FIELD = "status" + const val INDEX_FIELD = "index" + const val UUID_FIELD = "uuid" + const val PRI_FIELD = "pri" + const val REP_FIELD = "rep" + const val DOCS_COUNT_FIELD = "docs.count" + const val DOCS_DELETED_FIELD = "docs.deleted" + const val CREATION_DATE_FIELD = "creation.date" + const val CREATION_DATE_STRING_FIELD = "creation.date.string" + const val STORE_SIZE_FIELD = "store.size" + const val PRI_STORE_SIZE_FIELD = "pri.store.size" + const val COMPLETION_SIZE_FIELD = "completion.size" + const val PRI_COMPLETION_SIZE_FIELD = "pri.completion.size" + const val FIELD_DATA_MEMORY_SIZE_FIELD = "fielddata.memory_size" + const val PRI_FIELD_DATA_MEMORY_SIZE_FIELD = "pri.fielddata.memory_size" + const val FIELD_DATA_EVICTIONS_FIELD = "fielddata.evictions" + const val PRI_FIELD_DATA_EVICTIONS_FIELD = "pri.fielddata.evictions" + const val QUERY_CACHE_MEMORY_SIZE_FIELD = "query_cache.memory_size" + const val PRI_QUERY_CACHE_MEMORY_SIZE_FIELD = "pri.query_cache.memory_size" + const val QUERY_CACHE_EVICTIONS_FIELD = "query_cache.evictions" + const val PRI_QUERY_CACHE_EVICTIONS_FIELD = "pri.query_cache.evictions" + const val REQUEST_CACHE_MEMORY_SIZE_FIELD = "request_cache.memory_size" + const val PRI_REQUEST_CACHE_MEMORY_SIZE_FIELD = "pri.request_cache.memory_size" + const val REQUEST_CACHE_EVICTIONS_FIELD = "request_cache.evictions" + const val PRI_REQUEST_CACHE_EVICTIONS_FIELD = "pri.request_cache.evictions" + const val REQUEST_CACHE_HIT_COUNT_FIELD = "request_cache.hit_count" + const val PRI_REQUEST_CACHE_HIT_COUNT_FIELD = "pri.request_cache.hit_count" + const val REQUEST_CACHE_MISS_COUNT_FIELD = "request_cache.miss_count" + const val PRI_REQUEST_CACHE_MISS_COUNT_FIELD = "pri.request_cache.miss_count" + const val FLUSH_TOTAL_FIELD = "flush.total" + const val PRI_FLUSH_TOTAL_FIELD = "pri.flush.total" + const val FLUSH_TOTAL_TIME_FIELD = "flush.total_time" + const val PRI_FLUSH_TOTAL_TIME_FIELD = "pri.flush.total_time" + const val GET_CURRENT_FIELD = "get.current" + const val PRI_GET_CURRENT_FIELD = "pri.get.current" + const val GET_TIME_FIELD = "get.time" + const val PRI_GET_TIME_FIELD = "pri.get.time" + const val GET_TOTAL_FIELD = "get.total" + const val PRI_GET_TOTAL_FIELD = "pri.get.total" + const val GET_EXISTS_TIME_FIELD = "get.exists_time" + const val PRI_GET_EXISTS_TIME_FIELD = "pri.get.exists_time" + const val GET_EXISTS_TOTAL_FIELD = "get.exists_total" + const val PRI_GET_EXISTS_TOTAL_FIELD = "pri.get.exists_total" + const val GET_MISSING_TIME_FIELD = "get.missing_time" + const val PRI_GET_MISSING_TIME_FIELD = "pri.get.missing_time" + const val GET_MISSING_TOTAL_FIELD = "get.missing_total" + const val PRI_GET_MISSING_TOTAL_FIELD = "pri.get.missing_total" + const val INDEXING_DELETE_CURRENT_FIELD = "indexing.delete_current" + const val PRI_INDEXING_DELETE_CURRENT_FIELD = "pri.indexing.delete_current" + const val INDEXING_DELETE_TIME_FIELD = "indexing.delete_time" + const val PRI_INDEXING_DELETE_TIME_FIELD = "pri.indexing.delete_time" + const val INDEXING_DELETE_TOTAL_FIELD = "indexing.delete_total" + const val PRI_INDEXING_DELETE_TOTAL_FIELD = "pri.indexing.delete_total" + const val INDEXING_INDEX_CURRENT_FIELD = "indexing.index_current" + const val PRI_INDEXING_INDEX_CURRENT_FIELD = "pri.indexing.index_current" + const val INDEXING_INDEX_TIME_FIELD = "indexing.index_time" + const val PRI_INDEXING_INDEX_TIME_FIELD = "pri.indexing.index_time" + const val INDEXING_INDEX_TOTAL_FIELD = "indexing.index_total" + const val PRI_INDEXING_INDEX_TOTAL_FIELD = "pri.indexing.index_total" + const val INDEXING_INDEX_FAILED_FIELD = "indexing.index_failed" + const val PRI_INDEXING_INDEX_FAILED_FIELD = "pri.indexing.index_failed" + const val MERGES_CURRENT_FIELD = "merges.current" + const val PRI_MERGES_CURRENT_FIELD = "pri.merges.current" + const val MERGES_CURRENT_DOCS_FIELD = "merges.current_docs" + const val PRI_MERGES_CURRENT_DOCS_FIELD = "pri.merges.current_docs" + const val MERGES_CURRENT_SIZE_FIELD = "merges.current_size" + const val PRI_MERGES_CURRENT_SIZE_FIELD = "pri.merges.current_size" + const val MERGES_TOTAL_FIELD = "merges.total" + const val PRI_MERGES_TOTAL_FIELD = "pri.merges.total" + const val MERGES_TOTAL_DOCS_FIELD = "merges.total_docs" + const val PRI_MERGES_TOTAL_DOCS_FIELD = "pri.merges.total_docs" + const val MERGES_TOTAL_SIZE_FIELD = "merges.total_size" + const val PRI_MERGES_TOTAL_SIZE_FIELD = "pri.merges.total_size" + const val MERGES_TOTAL_TIME_FIELD = "merges.total_time" + const val PRI_MERGES_TOTAL_TIME_FIELD = "pri.merges.total_time" + const val REFRESH_TOTAL_FIELD = "refresh.total" + const val PRI_REFRESH_TOTAL_FIELD = "pri.refresh.total" + const val REFRESH_TIME_FIELD = "refresh.time" + const val PRI_REFRESH_TIME_FIELD = "pri.refresh.time" + const val REFRESH_EXTERNAL_TOTAL_FIELD = "refresh.external_total" + const val PRI_REFRESH_EXTERNAL_TOTAL_FIELD = "pri.refresh.external_total" + const val REFRESH_EXTERNAL_TIME_FIELD = "refresh.external_time" + const val PRI_REFRESH_EXTERNAL_TIME_FIELD = "pri.refresh.external_time" + const val REFRESH_LISTENERS_FIELD = "refresh.listeners" + const val PRI_REFRESH_LISTENERS_FIELD = "pri.refresh.listeners" + const val SEARCH_FETCH_CURRENT_FIELD = "search.fetch_current" + const val PRI_SEARCH_FETCH_CURRENT_FIELD = "pri.search.fetch_current" + const val SEARCH_FETCH_TIME_FIELD = "search.fetch_time" + const val PRI_SEARCH_FETCH_TIME_FIELD = "pri.search.fetch_time" + const val SEARCH_FETCH_TOTAL_FIELD = "search.fetch_total" + const val PRI_SEARCH_FETCH_TOTAL_FIELD = "pri.search.fetch_total" + const val SEARCH_OPEN_CONTEXTS_FIELD = "search.open_contexts" + const val PRI_SEARCH_OPEN_CONTEXTS_FIELD = "pri.search.open_contexts" + const val SEARCH_QUERY_CURRENT_FIELD = "search.query_current" + const val PRI_SEARCH_QUERY_CURRENT_FIELD = "pri.search.query_current" + const val SEARCH_QUERY_TIME_FIELD = "search.query_time" + const val PRI_SEARCH_QUERY_TIME_FIELD = "pri.search.query_time" + const val SEARCH_QUERY_TOTAL_FIELD = "search.query_total" + const val PRI_SEARCH_QUERY_TOTAL_FIELD = "pri.search.query_total" + const val SEARCH_SCROLL_CURRENT_FIELD = "search.scroll_current" + const val PRI_SEARCH_SCROLL_CURRENT_FIELD = "pri.search.scroll_current" + const val SEARCH_SCROLL_TIME_FIELD = "search.scroll_time" + const val PRI_SEARCH_SCROLL_TIME_FIELD = "pri.search.scroll_time" + const val SEARCH_SCROLL_TOTAL_FIELD = "search.scroll_total" + const val PRI_SEARCH_SCROLL_TOTAL_FIELD = "pri.search.scroll_total" + const val SEARCH_POINT_IN_TIME_CURRENT_FIELD = "search.point_in_time_current" + const val PRI_SEARCH_POINT_IN_TIME_CURRENT_FIELD = "pri.search.point_in_time_current" + const val SEARCH_POINT_IN_TIME_TIME_FIELD = "search.point_in_time_time" + const val PRI_SEARCH_POINT_IN_TIME_TIME_FIELD = "pri.search.point_in_time_time" + const val SEARCH_POINT_IN_TIME_TOTAL_FIELD = "search.point_in_time_total" + const val PRI_SEARCH_POINT_IN_TIME_TOTAL_FIELD = "pri.search.point_in_time_total" + const val SEGMENTS_COUNT_FIELD = "segments.count" + const val PRI_SEGMENTS_COUNT_FIELD = "pri.segments.count" + const val SEGMENTS_MEMORY_FIELD = "segments.memory" + const val PRI_SEGMENTS_MEMORY_FIELD = "pri.segments.memory" + const val SEGMENTS_INDEX_WRITER_MEMORY_FIELD = "segments.index_writer_memory" + const val PRI_SEGMENTS_INDEX_WRITER_MEMORY_FIELD = "pri.segments.index_writer_memory" + const val SEGMENTS_VERSION_MAP_MEMORY_FIELD = "segments.version_map_memory" + const val PRI_SEGMENTS_VERSION_MAP_MEMORY_FIELD = "pri.segments.version_map_memory" + const val SEGMENTS_FIXED_BITSET_MEMORY_FIELD = "segments.fixed_bitset_memory" + const val PRI_SEGMENTS_FIXED_BITSET_MEMORY_FIELD = "pri.segments.fixed_bitset_memory" + const val WARMER_CURRENT_FIELD = "warmer.current" + const val PRI_WARMER_CURRENT_FIELD = "pri.warmer.current" + const val WARMER_TOTAL_FIELD = "warmer.total" + const val PRI_WARMER_TOTAL_FIELD = "pri.warmer.total" + const val WARMER_TOTAL_TIME_FIELD = "warmer.total_time" + const val PRI_WARMER_TOTAL_TIME_FIELD = "pri.warmer.total_time" + const val SUGGEST_CURRENT_FIELD = "suggest.current" + const val PRI_SUGGEST_CURRENT_FIELD = "pri.suggest.current" + const val SUGGEST_TIME_FIELD = "suggest.time" + const val PRI_SUGGEST_TIME_FIELD = "pri.suggest.time" + const val SUGGEST_TOTAL_FIELD = "suggest.total" + const val PRI_SUGGEST_TOTAL_FIELD = "pri.suggest.total" + const val MEMORY_TOTAL_FIELD = "memory.total" + const val PRI_MEMORY_TOTAL_FIELD = "pri.memory.total" + const val SEARCH_THROTTLED_FIELD = "search.throttled" + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(HEALTH_FIELD, health) + .field(STATUS_FIELD, status) + .field(INDEX_FIELD, index) + .field(UUID_FIELD, uuid) + .field(PRI_FIELD, pri) + .field(REP_FIELD, rep) + .field(DOCS_COUNT_FIELD, docsCount) + .field(DOCS_DELETED_FIELD, docsDeleted) + .field(CREATION_DATE_FIELD, creationDate) + .field(CREATION_DATE_STRING_FIELD, creationDateString) + .field(STORE_SIZE_FIELD, storeSize) + .field(PRI_STORE_SIZE_FIELD, priStoreSize) + .field(COMPLETION_SIZE_FIELD, completionSize) + .field(PRI_COMPLETION_SIZE_FIELD, priCompletionSize) + .field(FIELD_DATA_MEMORY_SIZE_FIELD, fieldDataMemorySize) + .field(PRI_FIELD_DATA_MEMORY_SIZE_FIELD, priFieldDataMemorySize) + .field(FIELD_DATA_EVICTIONS_FIELD, fieldDataEvictions) + .field(PRI_FIELD_DATA_EVICTIONS_FIELD, priFieldDataEvictions) + .field(QUERY_CACHE_MEMORY_SIZE_FIELD, queryCacheMemorySize) + .field(PRI_QUERY_CACHE_MEMORY_SIZE_FIELD, priQueryCacheMemorySize) + .field(QUERY_CACHE_EVICTIONS_FIELD, queryCacheEvictions) + .field(PRI_QUERY_CACHE_EVICTIONS_FIELD, priQueryCacheEvictions) + .field(REQUEST_CACHE_MEMORY_SIZE_FIELD, requestCacheMemorySize) + .field(PRI_REQUEST_CACHE_MEMORY_SIZE_FIELD, priRequestCacheMemorySize) + .field(REQUEST_CACHE_EVICTIONS_FIELD, requestCacheEvictions) + .field(PRI_REQUEST_CACHE_EVICTIONS_FIELD, priRequestCacheEvictions) + .field(REQUEST_CACHE_HIT_COUNT_FIELD, requestCacheHitCount) + .field(PRI_REQUEST_CACHE_HIT_COUNT_FIELD, priRequestCacheHitCount) + .field(REQUEST_CACHE_MISS_COUNT_FIELD, requestCacheMissCount) + .field(PRI_REQUEST_CACHE_MISS_COUNT_FIELD, priRequestCacheMissCount) + .field(FLUSH_TOTAL_FIELD, flushTotal) + .field(PRI_FLUSH_TOTAL_FIELD, priFlushTotal) + .field(FLUSH_TOTAL_TIME_FIELD, flushTotalTime) + .field(PRI_FLUSH_TOTAL_TIME_FIELD, priFlushTotalTime) + .field(GET_CURRENT_FIELD, getCurrent) + .field(PRI_GET_CURRENT_FIELD, priGetCurrent) + .field(GET_TIME_FIELD, getTime) + .field(PRI_GET_TIME_FIELD, priGetTime) + .field(GET_TOTAL_FIELD, getTotal) + .field(PRI_GET_TOTAL_FIELD, priGetTotal) + .field(GET_EXISTS_TIME_FIELD, getExistsTime) + .field(PRI_GET_EXISTS_TIME_FIELD, priGetExistsTime) + .field(GET_EXISTS_TOTAL_FIELD, getExistsTotal) + .field(PRI_GET_EXISTS_TOTAL_FIELD, priGetExistsTotal) + .field(GET_MISSING_TIME_FIELD, getMissingTime) + .field(PRI_GET_MISSING_TIME_FIELD, priGetMissingTime) + .field(GET_MISSING_TOTAL_FIELD, getMissingTotal) + .field(PRI_GET_MISSING_TOTAL_FIELD, priGetMissingTotal) + .field(INDEXING_DELETE_CURRENT_FIELD, indexingDeleteCurrent) + .field(PRI_INDEXING_DELETE_CURRENT_FIELD, priIndexingDeleteCurrent) + .field(INDEXING_DELETE_TIME_FIELD, indexingDeleteTime) + .field(PRI_INDEXING_DELETE_TIME_FIELD, priIndexingDeleteTime) + .field(INDEXING_DELETE_TOTAL_FIELD, indexingDeleteTotal) + .field(PRI_INDEXING_DELETE_TOTAL_FIELD, priIndexingDeleteTotal) + .field(INDEXING_INDEX_CURRENT_FIELD, indexingIndexCurrent) + .field(PRI_INDEXING_INDEX_CURRENT_FIELD, priIndexingIndexCurrent) + .field(INDEXING_INDEX_TIME_FIELD, indexingIndexTime) + .field(PRI_INDEXING_INDEX_TIME_FIELD, priIndexingIndexTime) + .field(INDEXING_INDEX_TOTAL_FIELD, indexingIndexTotal) + .field(PRI_INDEXING_INDEX_TOTAL_FIELD, priIndexingIndexTotal) + .field(INDEXING_INDEX_FAILED_FIELD, indexingIndexFailed) + .field(PRI_INDEXING_INDEX_FAILED_FIELD, priIndexingIndexFailed) + .field(MERGES_CURRENT_FIELD, mergesCurrent) + .field(PRI_MERGES_CURRENT_FIELD, priMergesCurrent) + .field(MERGES_CURRENT_DOCS_FIELD, mergesCurrentDocs) + .field(PRI_MERGES_CURRENT_DOCS_FIELD, priMergesCurrentDocs) + .field(MERGES_CURRENT_SIZE_FIELD, mergesCurrentSize) + .field(PRI_MERGES_CURRENT_SIZE_FIELD, priMergesCurrentSize) + .field(MERGES_TOTAL_FIELD, mergesTotal) + .field(PRI_MERGES_TOTAL_FIELD, priMergesTotal) + .field(MERGES_TOTAL_DOCS_FIELD, mergesTotalDocs) + .field(PRI_MERGES_TOTAL_DOCS_FIELD, priMergesTotalDocs) + .field(MERGES_TOTAL_SIZE_FIELD, mergesTotalSize) + .field(PRI_MERGES_TOTAL_SIZE_FIELD, priMergesTotalSize) + .field(MERGES_TOTAL_TIME_FIELD, mergesTotalTime) + .field(PRI_MERGES_TOTAL_TIME_FIELD, priMergesTotalTime) + .field(REFRESH_TOTAL_FIELD, refreshTotal) + .field(PRI_REFRESH_TOTAL_FIELD, priRefreshTotal) + .field(REFRESH_TIME_FIELD, refreshTime) + .field(PRI_REFRESH_TIME_FIELD, priRefreshTime) + .field(REFRESH_EXTERNAL_TOTAL_FIELD, refreshExternalTotal) + .field(PRI_REFRESH_EXTERNAL_TOTAL_FIELD, priRefreshExternalTotal) + .field(REFRESH_EXTERNAL_TIME_FIELD, refreshExternalTime) + .field(PRI_REFRESH_EXTERNAL_TIME_FIELD, priRefreshExternalTime) + .field(REFRESH_LISTENERS_FIELD, refreshListeners) + .field(PRI_REFRESH_LISTENERS_FIELD, priRefreshListeners) + .field(SEARCH_FETCH_CURRENT_FIELD, searchFetchCurrent) + .field(PRI_SEARCH_FETCH_CURRENT_FIELD, priSearchFetchCurrent) + .field(SEARCH_FETCH_TIME_FIELD, searchFetchTime) + .field(PRI_SEARCH_FETCH_TIME_FIELD, priSearchFetchTime) + .field(SEARCH_FETCH_TOTAL_FIELD, searchFetchTotal) + .field(PRI_SEARCH_FETCH_TOTAL_FIELD, priSearchFetchTotal) + .field(SEARCH_OPEN_CONTEXTS_FIELD, searchOpenContexts) + .field(PRI_SEARCH_OPEN_CONTEXTS_FIELD, priSearchOpenContexts) + .field(SEARCH_QUERY_CURRENT_FIELD, searchQueryCurrent) + .field(PRI_SEARCH_QUERY_CURRENT_FIELD, priSearchQueryCurrent) + .field(SEARCH_QUERY_TIME_FIELD, searchQueryTime) + .field(PRI_SEARCH_QUERY_TIME_FIELD, priSearchQueryTime) + .field(SEARCH_QUERY_TOTAL_FIELD, searchQueryTotal) + .field(PRI_SEARCH_QUERY_TOTAL_FIELD, priSearchQueryTotal) + .field(SEARCH_SCROLL_CURRENT_FIELD, searchScrollCurrent) + .field(PRI_SEARCH_SCROLL_CURRENT_FIELD, priSearchScrollCurrent) + .field(SEARCH_SCROLL_TIME_FIELD, searchScrollTime) + .field(PRI_SEARCH_SCROLL_TIME_FIELD, priSearchScrollTime) + .field(SEARCH_SCROLL_TOTAL_FIELD, searchScrollTotal) + .field(PRI_SEARCH_SCROLL_TOTAL_FIELD, priSearchScrollTotal) + .field(SEARCH_POINT_IN_TIME_CURRENT_FIELD, searchPointInTimeCurrent) + .field(PRI_SEARCH_POINT_IN_TIME_CURRENT_FIELD, priSearchPointInTimeCurrent) + .field(SEARCH_POINT_IN_TIME_TIME_FIELD, searchPointInTimeTime) + .field(PRI_SEARCH_POINT_IN_TIME_TIME_FIELD, priSearchPointInTimeTime) + .field(SEARCH_POINT_IN_TIME_TOTAL_FIELD, searchPointInTimeTotal) + .field(PRI_SEARCH_POINT_IN_TIME_TOTAL_FIELD, priSearchPointInTimeTotal) + .field(SEGMENTS_COUNT_FIELD, segmentsCount) + .field(PRI_SEGMENTS_COUNT_FIELD, priSegmentsCount) + .field(SEGMENTS_MEMORY_FIELD, segmentsMemory) + .field(PRI_SEGMENTS_MEMORY_FIELD, priSegmentsMemory) + .field(SEGMENTS_INDEX_WRITER_MEMORY_FIELD, segmentsIndexWriterMemory) + .field(PRI_SEGMENTS_INDEX_WRITER_MEMORY_FIELD, priSegmentsIndexWriterMemory) + .field(SEGMENTS_VERSION_MAP_MEMORY_FIELD, segmentsVersionMapMemory) + .field(PRI_SEGMENTS_VERSION_MAP_MEMORY_FIELD, priSegmentsVersionMapMemory) + .field(SEGMENTS_FIXED_BITSET_MEMORY_FIELD, segmentsFixedBitsetMemory) + .field(PRI_SEGMENTS_FIXED_BITSET_MEMORY_FIELD, priSegmentsFixedBitsetMemory) + .field(WARMER_CURRENT_FIELD, warmerCurrent) + .field(PRI_WARMER_CURRENT_FIELD, priWarmerCurrent) + .field(WARMER_TOTAL_FIELD, warmerTotal) + .field(PRI_WARMER_TOTAL_FIELD, priWarmerTotal) + .field(WARMER_TOTAL_TIME_FIELD, warmerTotalTime) + .field(PRI_WARMER_TOTAL_TIME_FIELD, priWarmerTotalTime) + .field(SUGGEST_CURRENT_FIELD, suggestCurrent) + .field(PRI_SUGGEST_CURRENT_FIELD, priSuggestCurrent) + .field(SUGGEST_TIME_FIELD, suggestTime) + .field(PRI_SUGGEST_TIME_FIELD, priSuggestTime) + .field(SUGGEST_TOTAL_FIELD, suggestTotal) + .field(PRI_SUGGEST_TOTAL_FIELD, priSuggestTotal) + .field(MEMORY_TOTAL_FIELD, memoryTotal) + .field(PRI_MEMORY_TOTAL_FIELD, priMemoryTotal) + .field(SEARCH_THROTTLED_FIELD, searchThrottled) + return builder.endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(health) + out.writeString(status) + out.writeString(index) + out.writeString(uuid) + out.writeString(pri) + out.writeString(rep) + out.writeString(docsCount) + out.writeString(docsDeleted) + out.writeString(creationDate) + out.writeString(creationDateString) + out.writeString(storeSize) + out.writeString(priStoreSize) + out.writeString(completionSize) + out.writeString(priCompletionSize) + out.writeString(fieldDataMemorySize) + out.writeString(priFieldDataMemorySize) + out.writeString(fieldDataEvictions) + out.writeString(priFieldDataEvictions) + out.writeString(queryCacheMemorySize) + out.writeString(priQueryCacheMemorySize) + out.writeString(queryCacheEvictions) + out.writeString(priQueryCacheEvictions) + out.writeString(requestCacheMemorySize) + out.writeString(priRequestCacheMemorySize) + out.writeString(requestCacheEvictions) + out.writeString(priRequestCacheEvictions) + out.writeString(requestCacheHitCount) + out.writeString(priRequestCacheHitCount) + out.writeString(requestCacheMissCount) + out.writeString(priRequestCacheMissCount) + out.writeString(flushTotal) + out.writeString(priFlushTotal) + out.writeString(flushTotalTime) + out.writeString(priFlushTotalTime) + out.writeString(getCurrent) + out.writeString(priGetCurrent) + out.writeString(getTime) + out.writeString(priGetTime) + out.writeString(getTotal) + out.writeString(priGetTotal) + out.writeString(getExistsTime) + out.writeString(priGetExistsTime) + out.writeString(getExistsTotal) + out.writeString(priGetExistsTotal) + out.writeString(getMissingTime) + out.writeString(priGetMissingTime) + out.writeString(getMissingTotal) + out.writeString(priGetMissingTotal) + out.writeString(indexingDeleteCurrent) + out.writeString(priIndexingDeleteCurrent) + out.writeString(indexingDeleteTime) + out.writeString(priIndexingDeleteTime) + out.writeString(indexingDeleteTotal) + out.writeString(priIndexingDeleteTotal) + out.writeString(indexingIndexCurrent) + out.writeString(priIndexingIndexCurrent) + out.writeString(indexingIndexTime) + out.writeString(priIndexingIndexTime) + out.writeString(indexingIndexTotal) + out.writeString(priIndexingIndexTotal) + out.writeString(indexingIndexFailed) + out.writeString(priIndexingIndexFailed) + out.writeString(mergesCurrent) + out.writeString(priMergesCurrent) + out.writeString(mergesCurrentDocs) + out.writeString(priMergesCurrentDocs) + out.writeString(mergesCurrentSize) + out.writeString(priMergesCurrentSize) + out.writeString(mergesTotal) + out.writeString(priMergesTotal) + out.writeString(mergesTotalDocs) + out.writeString(priMergesTotalDocs) + out.writeString(mergesTotalSize) + out.writeString(priMergesTotalSize) + out.writeString(mergesTotalTime) + out.writeString(priMergesTotalTime) + out.writeString(refreshTotal) + out.writeString(priRefreshTotal) + out.writeString(refreshTime) + out.writeString(priRefreshTime) + out.writeString(refreshExternalTotal) + out.writeString(priRefreshExternalTotal) + out.writeString(refreshExternalTime) + out.writeString(priRefreshExternalTime) + out.writeString(refreshListeners) + out.writeString(priRefreshListeners) + out.writeString(searchFetchCurrent) + out.writeString(priSearchFetchCurrent) + out.writeString(searchFetchTime) + out.writeString(priSearchFetchTime) + out.writeString(searchFetchTotal) + out.writeString(priSearchFetchTotal) + out.writeString(searchOpenContexts) + out.writeString(priSearchOpenContexts) + out.writeString(searchQueryCurrent) + out.writeString(priSearchQueryCurrent) + out.writeString(searchQueryTime) + out.writeString(priSearchQueryTime) + out.writeString(searchQueryTotal) + out.writeString(priSearchQueryTotal) + out.writeString(searchScrollCurrent) + out.writeString(priSearchScrollCurrent) + out.writeString(searchScrollTime) + out.writeString(priSearchScrollTime) + out.writeString(searchScrollTotal) + out.writeString(priSearchScrollTotal) + out.writeString(searchPointInTimeCurrent) + out.writeString(priSearchPointInTimeCurrent) + out.writeString(searchPointInTimeTime) + out.writeString(priSearchPointInTimeTime) + out.writeString(searchPointInTimeTotal) + out.writeString(priSearchPointInTimeTotal) + out.writeString(segmentsCount) + out.writeString(priSegmentsCount) + out.writeString(segmentsMemory) + out.writeString(priSegmentsMemory) + out.writeString(segmentsIndexWriterMemory) + out.writeString(priSegmentsIndexWriterMemory) + out.writeString(segmentsVersionMapMemory) + out.writeString(priSegmentsVersionMapMemory) + out.writeString(segmentsFixedBitsetMemory) + out.writeString(priSegmentsFixedBitsetMemory) + out.writeString(warmerCurrent) + out.writeString(priWarmerCurrent) + out.writeString(warmerTotal) + out.writeString(priWarmerTotal) + out.writeString(warmerTotalTime) + out.writeString(priWarmerTotalTime) + out.writeString(suggestCurrent) + out.writeString(priSuggestCurrent) + out.writeString(suggestTime) + out.writeString(priSuggestTime) + out.writeString(suggestTotal) + out.writeString(priSuggestTotal) + out.writeString(memoryTotal) + out.writeString(priMemoryTotal) + out.writeString(searchThrottled) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt new file mode 100644 index 000000000..47c35eca8 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt @@ -0,0 +1,491 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.clusterMetricsMonitorHelpers + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ActionResponse +import org.opensearch.action.ValidateActions +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse +import org.opensearch.action.admin.indices.stats.CommonStats +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.action.admin.indices.stats.ShardStats +import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX +import org.opensearch.cluster.routing.UnassignedInfo +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.unit.TimeValue +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.cache.query.QueryCacheStats +import org.opensearch.index.engine.CommitStats +import org.opensearch.index.engine.Engine +import org.opensearch.index.engine.SegmentsStats +import org.opensearch.index.fielddata.FieldDataStats +import org.opensearch.index.flush.FlushStats +import org.opensearch.index.get.GetStats +import org.opensearch.index.merge.MergeStats +import org.opensearch.index.refresh.RefreshStats +import org.opensearch.index.search.stats.SearchStats +import org.opensearch.index.seqno.SeqNoStats +import org.opensearch.index.shard.DocsStats +import org.opensearch.index.store.StoreStats +import org.opensearch.search.suggest.completion.CompletionStats +import java.time.Instant +import java.util.Locale +import java.util.function.Function + +class CatShardsRequestWrapper(val pathParams: String = "") : ActionRequest() { + var clusterStateRequest: ClusterStateRequest = + ClusterStateRequest().clear().nodes(true).routingTable(true) + var indicesStatsRequest: IndicesStatsRequest = + IndicesStatsRequest().all() + var indicesList = arrayOf() + + init { + if (pathParams.isNotBlank()) { + indicesList = pathParams.split(",").toTypedArray() + + require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." } + + clusterStateRequest = clusterStateRequest.indices(*indicesList) + indicesStatsRequest = indicesStatsRequest.indices(*indicesList) + } + } + + override fun validate(): ActionRequestValidationException? { + var exception: ActionRequestValidationException? = null + if (pathParams.isNotBlank() && indicesList.any { !VALID_INDEX_NAME_REGEX.containsMatchIn(it) }) + exception = ValidateActions.addValidationError( + "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases.", + exception + ) + return exception + } +} + +class CatShardsResponseWrapper( + stateResp: ClusterStateResponse, + indicesResp: IndicesStatsResponse +) : ActionResponse(), ToXContentObject { + var shardInfoList: List = listOf() + + init { + shardInfoList = compileShardInfo(stateResp, indicesResp) + } + + companion object { + const val WRAPPER_FIELD = "shards" + } + + override fun writeTo(out: StreamOutput) { + out.writeList(shardInfoList) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.startArray(WRAPPER_FIELD) + shardInfoList.forEach { it.toXContent(builder, params) } + builder.endArray() + return builder.endObject() + } + + private fun getOrNull(stats: S?, accessor: Function, func: Function): Any? { + if (stats != null) { + val t: T? = accessor.apply(stats) + if (t != null) { + return func.apply(t) + } + } + return null + } + + private fun compileShardInfo( + stateResp: ClusterStateResponse, + indicesResp: IndicesStatsResponse + ): List { + val list = mutableListOf() + + for (shard in stateResp.state.routingTable.allShards()) { + val shardStats = indicesResp.asMap()[shard] + var commonStats: CommonStats? = null + var commitStats: CommitStats? = null + if (shardStats != null) { + commonStats = shardStats.stats + commitStats = shardStats.commitStats + } + + var shardInfo = ShardInfo( + index = shard.indexName, + shard = "${shard.id}", + primaryOrReplica = if (shard.primary()) "p" else "r", + state = shard.state().name, + docs = getOrNull(commonStats, CommonStats::getDocs, DocsStats::getCount)?.toString(), + store = getOrNull(commonStats, CommonStats::getStore, StoreStats::getSize)?.toString(), + id = null, // Added below + node = null, // Added below + completionSize = getOrNull(commonStats, CommonStats::getCompletion, CompletionStats::getSize)?.toString(), + fieldDataMemory = getOrNull(commonStats, CommonStats::getFieldData, FieldDataStats::getMemorySize)?.toString(), + fieldDataEvictions = getOrNull(commonStats, CommonStats::getFieldData, FieldDataStats::getEvictions)?.toString(), + flushTotal = getOrNull(commonStats, CommonStats::getFlush, FlushStats::getTotal)?.toString(), + flushTotalTime = getOrNull(commonStats, CommonStats::getFlush, FlushStats::getTotalTime)?.toString(), + getCurrent = getOrNull(commonStats, CommonStats::getGet, GetStats::current)?.toString(), + getTime = getOrNull(commonStats, CommonStats::getGet, GetStats::getTime)?.toString(), + getTotal = getOrNull(commonStats, CommonStats::getGet, GetStats::getCount)?.toString(), + getExistsTime = getOrNull(commonStats, CommonStats::getGet, GetStats::getExistsTime)?.toString(), + getExistsTotal = getOrNull(commonStats, CommonStats::getGet, GetStats::getExistsCount)?.toString(), + getMissingTime = getOrNull(commonStats, CommonStats::getGet, GetStats::getMissingTime)?.toString(), + getMissingTotal = getOrNull(commonStats, CommonStats::getGet, GetStats::getMissingCount)?.toString(), + indexingDeleteCurrent = getOrNull(commonStats, CommonStats::getIndexing, { it.total.deleteCurrent })?.toString(), + indexingDeleteTime = getOrNull(commonStats, CommonStats::getIndexing, { it.total.deleteTime })?.toString(), + indexingDeleteTotal = getOrNull(commonStats, CommonStats::getIndexing, { it.total.deleteCount })?.toString(), + indexingIndexCurrent = getOrNull(commonStats, CommonStats::getIndexing, { it.total.indexCurrent })?.toString(), + indexingIndexTime = getOrNull(commonStats, CommonStats::getIndexing, { it.total.indexTime })?.toString(), + indexingIndexTotal = getOrNull(commonStats, CommonStats::getIndexing, { it.total.indexCount })?.toString(), + indexingIndexFailed = getOrNull(commonStats, CommonStats::getIndexing, { it.total.indexFailedCount })?.toString(), + mergesCurrent = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrent)?.toString(), + mergesCurrentDocs = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentNumDocs)?.toString(), + mergesCurrentSize = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentSize)?.toString(), + mergesTotal = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotal)?.toString(), + mergesTotalDocs = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalNumDocs)?.toString(), + mergesTotalSize = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalSize)?.toString(), + mergesTotalTime = getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalTime)?.toString(), + queryCacheMemory = getOrNull(commonStats, CommonStats::getQueryCache, QueryCacheStats::getMemorySize)?.toString(), + queryCacheEvictions = getOrNull(commonStats, CommonStats::getQueryCache, QueryCacheStats::getEvictions)?.toString(), + recoverySourceType = null, // Added below + refreshTotal = getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotal)?.toString(), + refreshTime = getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotalTime)?.toString(), + searchFetchCurrent = getOrNull(commonStats, CommonStats::getSearch, { it.total.fetchCurrent })?.toString(), + searchFetchTime = getOrNull(commonStats, CommonStats::getSearch, { it.total.fetchTime })?.toString(), + searchFetchTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.fetchCount })?.toString(), + searchOpenContexts = getOrNull(commonStats, CommonStats::getSearch, SearchStats::getOpenContexts)?.toString(), + searchQueryCurrent = getOrNull(commonStats, CommonStats::getSearch, { it.total.queryCurrent })?.toString(), + searchQueryTime = getOrNull(commonStats, CommonStats::getSearch, { it.total.queryTime })?.toString(), + searchQueryTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.queryCount })?.toString(), + searchScrollCurrent = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollCurrent })?.toString(), + searchScrollTime = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollTime })?.toString(), + searchScrollTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollCount })?.toString(), + segmentsCount = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)?.toString(), + segmentsMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)?.toString(), + segmentsIndexWriterMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(), + segmentsVersionMapMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)?.toString(), + fixedBitsetMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)?.toString(), + globalCheckpoint = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)?.toString(), + localCheckpoint = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getLocalCheckpoint)?.toString(), + maxSeqNo = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getMaxSeqNo)?.toString(), + syncId = commitStats?.userData?.get(Engine.SYNC_COMMIT_ID), + unassignedAt = null, // Added below + unassignedDetails = null, // Added below + unassignedFor = null, // Added below + unassignedReason = null // Added below + ) + + if (shard.assignedToNode()) { + val id = shard.currentNodeId() + val node = StringBuilder() + node.append(stateResp.state.nodes().get(id).name) + + if (shard.relocating()) { + val reloNodeId = shard.relocatingNodeId() + val reloName = stateResp.state.nodes().get(reloNodeId).name + node.append(" -> ") + node.append(reloNodeId) + node.append(" ") + node.append(reloName) + } + + shardInfo = shardInfo.copy( + id = id, + node = node.toString() + ) + } + + if (shard.unassignedInfo() != null) { + val unassignedTime = Instant.ofEpochMilli(shard.unassignedInfo().unassignedTimeInMillis) + shardInfo = shardInfo.copy( + unassignedReason = shard.unassignedInfo().reason.name, + unassignedAt = UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime), + unassignedFor = TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep, + unassignedDetails = shard.unassignedInfo().details + ) + } + + if (shard.recoverySource() != null) { + shardInfo = shardInfo.copy( + recoverySourceType = shard.recoverySource().type.toString().lowercase(Locale.ROOT) + ) + } + + list.add(shardInfo) + } + return list + } + + data class ShardInfo( + val index: String?, + val shard: String?, + val primaryOrReplica: String?, + val state: String?, + val docs: String?, + val store: String?, + val id: String?, + val node: String?, + val completionSize: String?, + val fieldDataMemory: String?, + val fieldDataEvictions: String?, + val flushTotal: String?, + val flushTotalTime: String?, + val getCurrent: String?, + val getTime: String?, + val getTotal: String?, + val getExistsTime: String?, + val getExistsTotal: String?, + val getMissingTime: String?, + val getMissingTotal: String?, + val indexingDeleteCurrent: String?, + val indexingDeleteTime: String?, + val indexingDeleteTotal: String?, + val indexingIndexCurrent: String?, + val indexingIndexTime: String?, + val indexingIndexTotal: String?, + val indexingIndexFailed: String?, + val mergesCurrent: String?, + val mergesCurrentDocs: String?, + val mergesCurrentSize: String?, + val mergesTotal: String?, + val mergesTotalDocs: String?, + val mergesTotalSize: String?, + val mergesTotalTime: String?, + val queryCacheMemory: String?, + val queryCacheEvictions: String?, + val recoverySourceType: String?, + val refreshTotal: String?, + val refreshTime: String?, + val searchFetchCurrent: String?, + val searchFetchTime: String?, + val searchFetchTotal: String?, + val searchOpenContexts: String?, + val searchQueryCurrent: String?, + val searchQueryTime: String?, + val searchQueryTotal: String?, + val searchScrollCurrent: String?, + val searchScrollTime: String?, + val searchScrollTotal: String?, + val segmentsCount: String?, + val segmentsMemory: String?, + val segmentsIndexWriterMemory: String?, + val segmentsVersionMapMemory: String?, + val fixedBitsetMemory: String?, + val globalCheckpoint: String?, + val localCheckpoint: String?, + val maxSeqNo: String?, + val syncId: String?, + val unassignedAt: String?, + val unassignedDetails: String?, + val unassignedFor: String?, + val unassignedReason: String? + ) : ToXContentObject, Writeable { + companion object { + const val INDEX_FIELD = "index" + const val SHARD_FIELD = "shard" + const val PRIMARY_OR_REPLICA_FIELD = "primaryOrReplica" + const val STATE_FIELD = "state" + const val DOCS_FIELD = "docs" + const val STORE_FIELD = "store" + const val ID_FIELD = "id" + const val NODE_FIELD = "node" + const val COMPLETION_SIZE_FIELD = "completionSize" + const val FIELD_DATA_MEMORY_FIELD = "fielddataMemory" + const val FIELD_DATA_EVICTIONS_FIELD = "fielddataEvictions" + const val FLUSH_TOTAL_FIELD = "flushTotal" + const val FLUSH_TOTAL_TIME_FIELD = "flushTotalTime" + const val GET_CURRENT_FIELD = "getCurrent" + const val GET_TIME_FIELD = "getTime" + const val GET_TOTAL_FIELD = "getTotal" + const val GET_EXISTS_TIME_FIELD = "getExistsTime" + const val GET_EXISTS_TOTAL_FIELD = "getExistsTotal" + const val GET_MISSING_TIME_FIELD = "getMissingTime" + const val GET_MISSING_TOTAL_FIELD = "getMissingTotal" + const val INDEXING_DELETE_CURRENT_FIELD = "indexingDeleteCurrent" + const val INDEXING_DELETE_TIME_FIELD = "indexingDeleteTime" + const val INDEXING_DELETE_TOTAL_FIELD = "indexingDeleteTotal" + const val INDEXING_INDEX_CURRENT_FIELD = "indexingIndexCurrent" + const val INDEXING_INDEX_TIME_FIELD = "indexingIndexTime" + const val INDEXING_INDEX_TOTAL_FIELD = "indexingIndexTotal" + const val INDEXING_INDEX_FAILED_FIELD = "indexingIndexFailed" + const val MERGES_CURRENT_FIELD = "mergesCurrent" + const val MERGES_CURRENT_DOCS_FIELD = "mergesCurrentDocs" + const val MERGES_CURRENT_SIZE_FIELD = "mergesCurrentSize" + const val MERGES_TOTAL_FIELD = "mergesTotal" + const val MERGES_TOTAL_DOCS_FIELD = "mergesTotalDocs" + const val MERGES_TOTAL_SIZE_FIELD = "mergesTotalSize" + const val MERGES_TOTAL_TIME_FIELD = "mergesTotalTime" + const val QUERY_CACHE_MEMORY_FIELD = "queryCacheMemory" + const val QUERY_CACHE_EVICTIONS_FIELD = "queryCacheEvictions" + const val RECOVERY_SOURCE_TYPE_FIELD = "recoverysource.type" + const val REFRESH_TOTAL_FIELD = "refreshTotal" + const val REFRESH_TIME_FIELD = "refreshTime" + const val SEARCH_FETCH_CURRENT_FIELD = "searchFetchCurrent" + const val SEARCH_FETCH_TIME_FIELD = "searchFetchTime" + const val SEARCH_FETCH_TOTAL_FIELD = "searchFetchTotal" + const val SEARCH_OPEN_CONTEXTS_FIELD = "searchOpenContexts" + const val SEARCH_QUERY_CURRENT_FIELD = "searchQueryCurrent" + const val SEARCH_QUERY_TIME_FIELD = "searchQueryTime" + const val SEARCH_QUERY_TOTAL_FIELD = "searchQueryTotal" + const val SEARCH_SCROLL_CURRENT_FIELD = "searchScrollCurrent" + const val SEARCH_SCROLL_TIME_FIELD = "searchScrollTime" + const val SEARCH_SCROLL_TOTAL_FIELD = "searchScrollTotal" + const val SEGMENTS_COUNT_FIELD = "segmentsCount" + const val SEGMENTS_MEMORY_FIELD = "segmentsMemory" + const val SEGMENTS_INDEX_WRITER_MEMORY_FIELD = "segmentsIndexWriterMemory" + const val SEGMENTS_VERSION_MAP_MEMORY_FIELD = "segmentsVersionMapMemory" + const val FIXED_BITSET_MEMORY_FIELD = "fixedBitsetMemory" + const val GLOBAL_CHECKPOINT_FIELD = "globalCheckpoint" + const val LOCAL_CHECKPOINT_FIELD = "localCheckpoint" + const val MAX_SEQ_NO_FIELD = "maxSeqNo" + const val SYNC_ID_FIELD = "sync_id" + const val UNASSIGNED_AT_FIELD = "unassigned.at" + const val UNASSIGNED_DETAILS_FIELD = "unassigned.details" + const val UNASSIGNED_FOR_FIELD = "unassigned.for" + const val UNASSIGNED_REASON_FIELD = "unassigned.reason" + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(INDEX_FIELD, index) + .field(SHARD_FIELD, shard) + .field(PRIMARY_OR_REPLICA_FIELD, primaryOrReplica) + .field(STATE_FIELD, state) + .field(DOCS_FIELD, docs) + .field(STORE_FIELD, store) + .field(ID_FIELD, id) + .field(NODE_FIELD, node) + .field(COMPLETION_SIZE_FIELD, completionSize) + .field(FIELD_DATA_MEMORY_FIELD, fieldDataMemory) + .field(FIELD_DATA_EVICTIONS_FIELD, fieldDataEvictions) + .field(FLUSH_TOTAL_FIELD, flushTotal) + .field(FLUSH_TOTAL_TIME_FIELD, flushTotalTime) + .field(GET_CURRENT_FIELD, getCurrent) + .field(GET_TIME_FIELD, getTime) + .field(GET_TOTAL_FIELD, getTotal) + .field(GET_EXISTS_TIME_FIELD, getExistsTime) + .field(GET_EXISTS_TOTAL_FIELD, getExistsTotal) + .field(GET_MISSING_TIME_FIELD, getMissingTime) + .field(GET_MISSING_TOTAL_FIELD, getMissingTotal) + .field(INDEXING_DELETE_CURRENT_FIELD, indexingDeleteCurrent) + .field(INDEXING_DELETE_TIME_FIELD, indexingDeleteTime) + .field(INDEXING_DELETE_TOTAL_FIELD, indexingDeleteTotal) + .field(INDEXING_INDEX_CURRENT_FIELD, indexingIndexCurrent) + .field(INDEXING_INDEX_TIME_FIELD, indexingIndexTime) + .field(INDEXING_INDEX_TOTAL_FIELD, indexingIndexTotal) + .field(INDEXING_INDEX_FAILED_FIELD, indexingIndexFailed) + .field(MERGES_CURRENT_FIELD, mergesCurrent) + .field(MERGES_CURRENT_DOCS_FIELD, mergesCurrentDocs) + .field(MERGES_CURRENT_SIZE_FIELD, mergesCurrentSize) + .field(MERGES_TOTAL_FIELD, mergesTotal) + .field(MERGES_TOTAL_DOCS_FIELD, mergesTotalDocs) + .field(MERGES_TOTAL_SIZE_FIELD, mergesTotalSize) + .field(MERGES_TOTAL_TIME_FIELD, mergesTotalTime) + .field(QUERY_CACHE_MEMORY_FIELD, queryCacheMemory) + .field(QUERY_CACHE_EVICTIONS_FIELD, queryCacheEvictions) + .field(RECOVERY_SOURCE_TYPE_FIELD, recoverySourceType) + .field(REFRESH_TOTAL_FIELD, refreshTotal) + .field(REFRESH_TIME_FIELD, refreshTime) + .field(SEARCH_FETCH_CURRENT_FIELD, searchFetchCurrent) + .field(SEARCH_FETCH_TIME_FIELD, searchFetchTime) + .field(SEARCH_FETCH_TOTAL_FIELD, searchFetchTotal) + .field(SEARCH_OPEN_CONTEXTS_FIELD, searchOpenContexts) + .field(SEARCH_QUERY_CURRENT_FIELD, searchQueryCurrent) + .field(SEARCH_QUERY_TIME_FIELD, searchQueryTime) + .field(SEARCH_QUERY_TOTAL_FIELD, searchQueryTotal) + .field(SEARCH_SCROLL_CURRENT_FIELD, searchScrollCurrent) + .field(SEARCH_SCROLL_TIME_FIELD, searchScrollTime) + .field(SEARCH_SCROLL_TOTAL_FIELD, searchScrollTotal) + .field(SEGMENTS_COUNT_FIELD, segmentsCount) + .field(SEGMENTS_MEMORY_FIELD, segmentsMemory) + .field(SEGMENTS_INDEX_WRITER_MEMORY_FIELD, segmentsIndexWriterMemory) + .field(SEGMENTS_VERSION_MAP_MEMORY_FIELD, segmentsVersionMapMemory) + .field(FIXED_BITSET_MEMORY_FIELD, fixedBitsetMemory) + .field(GLOBAL_CHECKPOINT_FIELD, globalCheckpoint) + .field(LOCAL_CHECKPOINT_FIELD, localCheckpoint) + .field(MAX_SEQ_NO_FIELD, maxSeqNo) + .field(SYNC_ID_FIELD, syncId) + .field(UNASSIGNED_AT_FIELD, unassignedAt) + .field(UNASSIGNED_DETAILS_FIELD, unassignedDetails) + .field(UNASSIGNED_FOR_FIELD, unassignedFor) + .field(UNASSIGNED_REASON_FIELD, unassignedReason) + return builder.endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(index) + out.writeString(shard) + out.writeString(primaryOrReplica) + out.writeString(state) + out.writeString(docs) + out.writeString(store) + out.writeString(id) + out.writeString(node) + out.writeString(completionSize) + out.writeString(fieldDataMemory) + out.writeString(fieldDataEvictions) + out.writeString(flushTotal) + out.writeString(flushTotalTime) + out.writeString(getCurrent) + out.writeString(getTime) + out.writeString(getTotal) + out.writeString(getExistsTime) + out.writeString(getExistsTotal) + out.writeString(getMissingTime) + out.writeString(getMissingTotal) + out.writeString(indexingDeleteCurrent) + out.writeString(indexingDeleteTime) + out.writeString(indexingDeleteTotal) + out.writeString(indexingIndexCurrent) + out.writeString(indexingIndexTime) + out.writeString(indexingIndexTotal) + out.writeString(indexingIndexFailed) + out.writeString(mergesCurrent) + out.writeString(mergesCurrentDocs) + out.writeString(mergesCurrentSize) + out.writeString(mergesTotal) + out.writeString(mergesTotalDocs) + out.writeString(mergesTotalSize) + out.writeString(mergesTotalTime) + out.writeString(queryCacheMemory) + out.writeString(queryCacheEvictions) + out.writeString(recoverySourceType) + out.writeString(refreshTotal) + out.writeString(refreshTime) + out.writeString(searchFetchCurrent) + out.writeString(searchFetchTime) + out.writeString(searchFetchTotal) + out.writeString(searchOpenContexts) + out.writeString(searchQueryCurrent) + out.writeString(searchQueryTime) + out.writeString(searchQueryTotal) + out.writeString(searchScrollCurrent) + out.writeString(searchScrollTime) + out.writeString(searchScrollTotal) + out.writeString(segmentsCount) + out.writeString(segmentsMemory) + out.writeString(segmentsIndexWriterMemory) + out.writeString(segmentsVersionMapMemory) + out.writeString(fixedBitsetMemory) + out.writeString(globalCheckpoint) + out.writeString(localCheckpoint) + out.writeString(maxSeqNo) + out.writeString(syncId) + out.writeString(unassignedAt) + out.writeString(unassignedDetails) + out.writeString(unassignedFor) + out.writeString(unassignedReason) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt similarity index 81% rename from alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt rename to alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt index 6623ec483..bf5c720c4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.util +package org.opensearch.alerting.util.clusterMetricsMonitorHelpers import org.opensearch.action.ActionResponse import org.opensearch.action.admin.cluster.health.ClusterHealthRequest @@ -29,6 +29,8 @@ import org.opensearch.client.Client import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.support.XContentMapValues import org.opensearch.commons.alerting.model.ClusterMetricsInput +import kotlin.collections.ArrayList +import kotlin.collections.HashMap /** * Calls the appropriate transport action for the API requested in the [clusterMetricsInput]. @@ -39,9 +41,23 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { val request = resolveToActionRequest(clusterMetricsInput) return when (clusterMetricsInput.clusterMetricType) { + ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> { + request as CatIndicesRequestWrapper + val healthResponse = client.admin().cluster().health(request.clusterHealthRequest).get() + val indexSettingsResponse = client.admin().indices().getSettings(request.indexSettingsRequest).get() + val indicesResponse = client.admin().indices().stats(request.indicesStatsRequest).get() + val stateResponse = client.admin().cluster().state(request.clusterStateRequest).get() + return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse) + } ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster() .pendingClusterTasks(request as PendingClusterTasksRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get() + ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> { + request as CatShardsRequestWrapper + val stateResponse = client.admin().cluster().state(request.clusterStateRequest).get() + val indicesResponse = client.admin().indices().stats(request.indicesStatsRequest).get() + return CatShardsResponseWrapper(stateResponse, indicesResponse) + } ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get() ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get() @@ -74,6 +90,14 @@ fun ActionResponse.toMap(): Map { this.convertToMap(), SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS.defaultPath) ) + is CatIndicesResponseWrapper -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath) + ) + is CatShardsResponseWrapper -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_SHARDS.defaultPath) + ) is NodesStatsResponse -> redactFieldsFromResponse( this.convertToMap(), SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.NODES_STATS.defaultPath) diff --git a/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json b/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json index 9ed045ab3..a153a67b2 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json +++ b/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json @@ -1,6 +1,8 @@ { + "/_cat/indices": {}, "/_cat/pending_tasks": {}, "/_cat/recovery": {}, + "/_cat/shards": {}, "/_cat/snapshots": {}, "/_cat/tasks": {}, "/_cluster/health": {}, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt new file mode 100644 index 000000000..db8a128d1 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.clusterMetricsMonitorHelpers + +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.randomClusterMetricsInput +import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesResponseWrapper.Companion.WRAPPER_FIELD +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.core.common.Strings +import org.opensearch.test.OpenSearchSingleNodeTestCase + +class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { + private val path = ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath + + fun `test CatIndicesRequestWrapper validate valid pathParams`() { + // GIVEN + val pathParams = "index1,index-name-2,index-3" + + // WHEN + val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams) + + // THEN + assertEquals(3, requestWrapper.clusterHealthRequest.indices().size) + assertEquals(3, requestWrapper.clusterStateRequest.indices().size) + assertEquals(3, requestWrapper.indexSettingsRequest.indices().size) + assertEquals(3, requestWrapper.indicesStatsRequest.indices().size) + } + + fun `test CatIndicesRequestWrapper validate without providing pathParams`() { + // GIVEN & WHEN + val requestWrapper = CatIndicesRequestWrapper() + + // THEN + assertNull(requestWrapper.clusterHealthRequest.indices()) + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices()) + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices()) + assertNull(requestWrapper.indicesStatsRequest.indices()) + } + + fun `test CatIndicesRequestWrapper validate blank pathParams`() { + // GIVEN + val pathParams = " " + + // WHEN + val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams) + + // THEN + assertNull(requestWrapper.clusterHealthRequest.indices()) + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices()) + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices()) + assertNull(requestWrapper.indicesStatsRequest.indices()) + } + + fun `test CatIndicesRequestWrapper validate empty pathParams`() { + // GIVEN + val pathParams = "" + + // WHEN + val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams) + + // THEN + assertNull(requestWrapper.clusterHealthRequest.indices()) + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices()) + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices()) + assertNull(requestWrapper.indicesStatsRequest.indices()) + } + + fun `test CatIndicesRequestWrapper validate invalid pathParams`() { + // GIVEN + val pathParams = "_index1,index^2" + + // WHEN & THEN + assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) } + } + + fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { + // GIVEN + val testIndices = (1..5).map { + "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) + }.toMap() + + testIndices.forEach { (indexName, docCount) -> + repeat(docCount) { + val docId = (it + 1).toString() + val docMessage = """ + { + "message": "$indexName doc num $docId" + } + """.trimIndent() + indexDoc(indexName, docId, docMessage) + } + } + + /* + Creating a subset of indices to use for the pathParams to test that all indices on the cluster ARE NOT returned. + */ + val pathParamsIndices = testIndices.keys.toList().subList(1, testIndices.size - 1) + val pathParams = pathParamsIndices.joinToString(",") + val input = randomClusterMetricsInput(path = path, pathParams = pathParams) + + // WHEN + val responseMap = (executeTransportAction(input, client())).toMap() + + // THEN + val shards = responseMap[WRAPPER_FIELD] as List> + val returnedIndices = + shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap() + + assertEquals(pathParamsIndices.size, returnedIndices.keys.size) + testIndices.forEach { (indexName, docCount) -> + if (pathParamsIndices.contains(indexName)) { + assertEquals(indexName, returnedIndices[indexName] + ?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String) + assertEquals(docCount.toString(), returnedIndices[indexName] + ?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String) + } + } + } + + fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { + // GIVEN + val testIndices = (1..5).map { + "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) + }.toMap() + + testIndices.forEach { (indexName, docCount) -> + repeat(docCount) { + val docId = (it + 1).toString() + val docMessage = """ + { + "message": "$indexName doc num $docId" + } + """.trimIndent() + indexDoc(indexName, docId, docMessage) + } + } + + val input = randomClusterMetricsInput(path = path) + + // WHEN + val responseMap = (executeTransportAction(input, client())).toMap() + + // THEN + val shards = responseMap[WRAPPER_FIELD] as List> + val returnedIndices = + shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap() + + assertEquals(testIndices.size, returnedIndices.keys.size) + testIndices.forEach { (indexName, docCount) -> + assertEquals(indexName, returnedIndices[indexName] + ?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String) + assertEquals(docCount.toString(), returnedIndices[indexName] + ?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String) + } + } + + private fun indexDoc(index: String, id: String, doc: String) { + client().prepareIndex(index).setId(id) + .setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get() + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt new file mode 100644 index 000000000..d34f08509 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt @@ -0,0 +1,157 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.clusterMetricsMonitorHelpers + +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.randomClusterMetricsInput +import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatShardsResponseWrapper.Companion.WRAPPER_FIELD +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.core.common.Strings +import org.opensearch.test.OpenSearchSingleNodeTestCase + +class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { + private val path = ClusterMetricsInput.ClusterMetricType.CAT_SHARDS.defaultPath + + fun `test CatShardsRequestWrapper validate valid pathParams`() { + // GIVEN + val pathParams = "index1,index_2,index-3" + + // WHEN + val requestWrapper = CatShardsRequestWrapper(pathParams = pathParams) + + // THEN + assertEquals(3, requestWrapper.clusterStateRequest.indices().size) + assertEquals(3, requestWrapper.indicesStatsRequest.indices().size) + } + + fun `test CatShardsRequestWrapper validate without providing pathParams`() { + // GIVEN & WHEN + val requestWrapper = CatShardsRequestWrapper() + + // THEN + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices()) + assertNull(requestWrapper.indicesStatsRequest.indices()) + } + + fun `test CatShardsRequestWrapper validate blank pathParams`() { + // GIVEN + val pathParams = " " + + // WHEN + val requestWrapper = CatShardsRequestWrapper(pathParams = pathParams) + + // THEN + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices()) + assertNull(requestWrapper.indicesStatsRequest.indices()) + } + + fun `test CatShardsRequestWrapper validate empty pathParams`() { + // GIVEN + val pathParams = "" + + // WHEN + val requestWrapper = CatShardsRequestWrapper(pathParams = pathParams) + + // THEN + assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices()) + assertNull(requestWrapper.indicesStatsRequest.indices()) + } + + fun `test CatShardsRequestWrapper validate invalid pathParams`() { + // GIVEN + val pathParams = "_index1,index^2" + + // WHEN & THEN + assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) } + } + + fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { + // GIVEN + val testIndices = (1..5).map { + "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) + }.toMap() + + testIndices.forEach { (indexName, docCount) -> + repeat(docCount) { + val docId = (it + 1).toString() + val docMessage = """ + { + "message": "$indexName doc num $docId" + } + """.trimIndent() + indexDoc(indexName, docId, docMessage) + } + } + + /* + Creating a subset of indices to use for the pathParams to test that all indices on the cluster ARE NOT returned. + */ + val pathParamsIndices = testIndices.keys.toList().subList(1, testIndices.size - 1) + val pathParams = pathParamsIndices.joinToString(",") + val input = randomClusterMetricsInput(path = path, pathParams = pathParams) + + // WHEN + val responseMap = (executeTransportAction(input, client())).toMap() + + // THEN + val shards = responseMap[WRAPPER_FIELD] as List> + val returnedIndices = + shards.map { (it[CatShardsResponseWrapper.ShardInfo.INDEX_FIELD] as String) to it }.toMap() + + assertEquals(pathParamsIndices.size, returnedIndices.keys.size) + testIndices.forEach { (indexName, docCount) -> + if (pathParamsIndices.contains(indexName)) { + assertEquals(indexName, returnedIndices[indexName] + ?.get(CatShardsResponseWrapper.ShardInfo.INDEX_FIELD) as String) + assertEquals(docCount.toString(), returnedIndices[indexName] + ?.get(CatShardsResponseWrapper.ShardInfo.DOCS_FIELD) as String) + } + } + } + + fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { + // GIVEN + val testIndices = (1..5).map { + "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) + }.toMap() + + testIndices.forEach { (indexName, docCount) -> + repeat(docCount) { + val docId = (it + 1).toString() + val docMessage = """ + { + "message": "$indexName doc num $docId" + } + """.trimIndent() + indexDoc(indexName, docId, docMessage) + } + } + + val input = randomClusterMetricsInput(path = path) + + // WHEN + val responseMap = (executeTransportAction(input, client())).toMap() + + // THEN + val shards = responseMap[WRAPPER_FIELD] as List> + val returnedIndices = + shards.map { (it[CatShardsResponseWrapper.ShardInfo.INDEX_FIELD] as String) to it }.toMap() + + assertEquals(testIndices.size, returnedIndices.keys.size) + testIndices.forEach { (indexName, docCount) -> + assertEquals(indexName, returnedIndices[indexName] + ?.get(CatShardsResponseWrapper.ShardInfo.INDEX_FIELD) as String) + assertEquals(docCount.toString(), returnedIndices[indexName] + ?.get(CatShardsResponseWrapper.ShardInfo.DOCS_FIELD) as String) + } + } + + private fun indexDoc(index: String, id: String, doc: String) { + client().prepareIndex(index).setId(id) + .setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get() + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensionsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensionsTests.kt similarity index 98% rename from alerting/src/test/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensionsTests.kt rename to alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensionsTests.kt index bb59ff7d1..bfe5b8dce 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensionsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensionsTests.kt @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.util +package org.opensearch.alerting.util.clusterMetricsMonitorHelpers import org.opensearch.test.OpenSearchTestCase From 6b9ea2b2a4a2e81747eb1437f7374b01b833a7c6 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 10 Jul 2023 20:16:50 -0700 Subject: [PATCH 2/5] Fixed ktlint errors. Signed-off-by: AWSHurneyt --- .../CatIndicesWrappersIT.kt | 24 ++++++++++++------- .../CatShardsWrappersIT.kt | 24 ++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt index db8a128d1..c63ebb51f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt @@ -113,10 +113,14 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { assertEquals(pathParamsIndices.size, returnedIndices.keys.size) testIndices.forEach { (indexName, docCount) -> if (pathParamsIndices.contains(indexName)) { - assertEquals(indexName, returnedIndices[indexName] - ?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String) - assertEquals(docCount.toString(), returnedIndices[indexName] - ?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String) + assertEquals( + indexName, + returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String + ) + assertEquals( + docCount.toString(), + returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String + ) } } } @@ -151,10 +155,14 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { assertEquals(testIndices.size, returnedIndices.keys.size) testIndices.forEach { (indexName, docCount) -> - assertEquals(indexName, returnedIndices[indexName] - ?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String) - assertEquals(docCount.toString(), returnedIndices[indexName] - ?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String) + assertEquals( + indexName, + returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String + ) + assertEquals( + docCount.toString(), + returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt index d34f08509..d945e9b2b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt @@ -105,10 +105,14 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { assertEquals(pathParamsIndices.size, returnedIndices.keys.size) testIndices.forEach { (indexName, docCount) -> if (pathParamsIndices.contains(indexName)) { - assertEquals(indexName, returnedIndices[indexName] - ?.get(CatShardsResponseWrapper.ShardInfo.INDEX_FIELD) as String) - assertEquals(docCount.toString(), returnedIndices[indexName] - ?.get(CatShardsResponseWrapper.ShardInfo.DOCS_FIELD) as String) + assertEquals( + indexName, + returnedIndices[indexName]?.get(CatShardsResponseWrapper.ShardInfo.INDEX_FIELD) as String + ) + assertEquals( + docCount.toString(), + returnedIndices[indexName]?.get(CatShardsResponseWrapper.ShardInfo.DOCS_FIELD) as String + ) } } } @@ -143,10 +147,14 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { assertEquals(testIndices.size, returnedIndices.keys.size) testIndices.forEach { (indexName, docCount) -> - assertEquals(indexName, returnedIndices[indexName] - ?.get(CatShardsResponseWrapper.ShardInfo.INDEX_FIELD) as String) - assertEquals(docCount.toString(), returnedIndices[indexName] - ?.get(CatShardsResponseWrapper.ShardInfo.DOCS_FIELD) as String) + assertEquals( + indexName, + returnedIndices[indexName]?.get(CatShardsResponseWrapper.ShardInfo.INDEX_FIELD) as String + ) + assertEquals( + docCount.toString(), + returnedIndices[indexName]?.get(CatShardsResponseWrapper.ShardInfo.DOCS_FIELD) as String + ) } } From f76928f15d7a833a637b99023bc969b87eeba6f8 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 11 Jul 2023 10:17:47 -0700 Subject: [PATCH 3/5] Refactored executeTransportAction to use suspendUntil() instead of get() to receive responses. Signed-off-by: AWSHurneyt --- ...pportedClusterMetricsSettingsExtensions.kt | 51 +++++++++++++------ .../CatIndicesWrappersIT.kt | 4 +- .../CatShardsWrappersIT.kt | 4 +- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt index bf5c720c4..3dc35e6c4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt @@ -16,16 +16,21 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse import org.opensearch.action.admin.indices.recovery.RecoveryRequest import org.opensearch.action.admin.indices.recovery.RecoveryResponse +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest import org.opensearch.client.Client +import org.opensearch.cluster.metadata.Metadata import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.support.XContentMapValues import org.opensearch.commons.alerting.model.ClusterMetricsInput @@ -38,35 +43,49 @@ import kotlin.collections.HashMap * @param client The [Client] used to call the respective transport action. * @throws IllegalArgumentException When the requested API is not supported by this feature. */ -fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { +suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { val request = resolveToActionRequest(clusterMetricsInput) return when (clusterMetricsInput.clusterMetricType) { ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> { request as CatIndicesRequestWrapper - val healthResponse = client.admin().cluster().health(request.clusterHealthRequest).get() - val indexSettingsResponse = client.admin().indices().getSettings(request.indexSettingsRequest).get() - val indicesResponse = client.admin().indices().stats(request.indicesStatsRequest).get() - val stateResponse = client.admin().cluster().state(request.clusterStateRequest).get() + val healthResponse: ClusterHealthResponse = + client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) } + val indexSettingsResponse: GetSettingsResponse = + client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) } + val indicesResponse: IndicesStatsResponse = + client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) } + val stateResponse: ClusterStateResponse = + client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) } return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse) } - ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster() - .pendingClusterTasks(request as PendingClusterTasksRequest).get() - ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get() + ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> + client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) } + ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> + client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) } ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> { request as CatShardsRequestWrapper - val stateResponse = client.admin().cluster().state(request.clusterStateRequest).get() - val indicesResponse = client.admin().indices().stats(request.indicesStatsRequest).get() + val stateResponse: ClusterStateResponse = + client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) } + val indicesResponse: IndicesStatsResponse = + client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) } return CatShardsResponseWrapper(stateResponse, indicesResponse) } - ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get() - ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get() - ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get() + ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> + client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) } + ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> + client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) } + ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> + client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) } ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> { - val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata + val stateResponse: ClusterStateResponse = + client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) } + val metadata: Metadata = stateResponse.state.metadata return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY) } - ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get() - ClusterMetricsInput.ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get() + ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> + client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) } + ClusterMetricsInput.ClusterMetricType.NODES_STATS -> + client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) } else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}") } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt index c63ebb51f..9712b4213 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) } } - fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { + suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { } } - fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { + suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt index d945e9b2b..c8b5db561 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) } } - fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { + suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { } } - fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { + suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) From 6e4076367297feba68f3cdfdadc783d0c4acc388 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 11 Jul 2023 20:14:04 -0700 Subject: [PATCH 4/5] Fixed ktlint errors. Signed-off-by: AWSHurneyt --- .../clusterMetricsMonitorHelpers/CatIndicesHelpers.kt | 4 +++- .../clusterMetricsMonitorHelpers/CatShardsHelpers.kt | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt index d3447ed99..38420c98a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt @@ -53,7 +53,9 @@ class CatIndicesRequestWrapper(val pathParams: String = "") : ActionRequest() { if (pathParams.isNotBlank()) { indicesList = pathParams.split(",").toTypedArray() - require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." } + require(validate() == null) { + "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." + } clusterHealthRequest = clusterHealthRequest.indices(*indicesList) clusterStateRequest = clusterStateRequest.indices(*indicesList) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt index 47c35eca8..cde22918b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt @@ -52,7 +52,9 @@ class CatShardsRequestWrapper(val pathParams: String = "") : ActionRequest() { if (pathParams.isNotBlank()) { indicesList = pathParams.split(",").toTypedArray() - require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." } + require(validate() == null) { + "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." + } clusterStateRequest = clusterStateRequest.indices(*indicesList) indicesStatsRequest = indicesStatsRequest.indices(*indicesList) @@ -173,7 +175,8 @@ class CatShardsResponseWrapper( searchScrollTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollCount })?.toString(), segmentsCount = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)?.toString(), segmentsMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)?.toString(), - segmentsIndexWriterMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(), + segmentsIndexWriterMemory = + getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(), segmentsVersionMapMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)?.toString(), fixedBitsetMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)?.toString(), globalCheckpoint = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)?.toString(), @@ -211,7 +214,8 @@ class CatShardsResponseWrapper( shardInfo = shardInfo.copy( unassignedReason = shard.unassignedInfo().reason.name, unassignedAt = UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime), - unassignedFor = TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep, + unassignedFor = + TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep, unassignedDetails = shard.unassignedInfo().details ) } From 3eb7a4183abc2f0a21d47d3a660e970ba3541ad5 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 11 Jul 2023 21:59:28 -0700 Subject: [PATCH 5/5] Refactored API calls from suspendUntil() to get(). Signed-off-by: AWSHurneyt --- ...pportedClusterMetricsSettingsExtensions.kt | 31 +++++++++---------- .../CatIndicesWrappersIT.kt | 4 +-- .../CatShardsWrappersIT.kt | 4 +-- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt index 3dc35e6c4..fd596d202 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt @@ -26,7 +26,6 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.alerting.opensearchapi.convertToMap -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest import org.opensearch.client.Client @@ -43,49 +42,49 @@ import kotlin.collections.HashMap * @param client The [Client] used to call the respective transport action. * @throws IllegalArgumentException When the requested API is not supported by this feature. */ -suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { +fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { val request = resolveToActionRequest(clusterMetricsInput) return when (clusterMetricsInput.clusterMetricType) { ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> { request as CatIndicesRequestWrapper val healthResponse: ClusterHealthResponse = - client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) } + client.admin().cluster().health(request.clusterHealthRequest).get() val indexSettingsResponse: GetSettingsResponse = - client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) } + client.admin().indices().getSettings(request.indexSettingsRequest).get() val indicesResponse: IndicesStatsResponse = - client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) } + client.admin().indices().stats(request.indicesStatsRequest).get() val stateResponse: ClusterStateResponse = - client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) } + client.admin().cluster().state(request.clusterStateRequest).get() return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse) } ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> - client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) } + client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> - client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) } + client.admin().indices().recoveries(request as RecoveryRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> { request as CatShardsRequestWrapper val stateResponse: ClusterStateResponse = - client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) } + client.admin().cluster().state(request.clusterStateRequest).get() val indicesResponse: IndicesStatsResponse = - client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) } + client.admin().indices().stats(request.indicesStatsRequest).get() return CatShardsResponseWrapper(stateResponse, indicesResponse) } ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> - client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) } + client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> - client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) } + client.admin().cluster().listTasks(request as ListTasksRequest).get() ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> - client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) } + client.admin().cluster().health(request as ClusterHealthRequest).get() ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> { val stateResponse: ClusterStateResponse = - client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) } + client.admin().cluster().state(request as ClusterStateRequest).get() val metadata: Metadata = stateResponse.state.metadata return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY) } ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> - client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) } + client.admin().cluster().clusterStats(request as ClusterStatsRequest).get() ClusterMetricsInput.ClusterMetricType.NODES_STATS -> - client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) } + client.admin().cluster().nodesStats(request as NodesStatsRequest).get() else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}") } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt index 9712b4213..c63ebb51f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) } } - suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { + fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { } } - suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { + fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt index c8b5db561..d945e9b2b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) } } - suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { + fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { } } - suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { + fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)