Skip to content

Commit

Permalink
Temp refactor cluster metrics monitor suspendUntil() to get(). (opens…
Browse files Browse the repository at this point in the history
…earch-project#1015)

* Implemented support for configuring a cluster metrics monitor to call cat/indices, and cat/shards.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Refactored executeTransportAction to use suspendUntil() instead of get() to receive responses.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Refactored API calls from suspendUntil() to get().

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

---------

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
  • Loading branch information
AWSHurneyt committed Jul 12, 2023
1 parent b0c3106 commit b0f2281
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class CatIndicesRequestWrapper(val pathParams: String = "") : ActionRequest() {
if (pathParams.isNotBlank()) {
indicesList = pathParams.split(",").toTypedArray()

require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." }
require(validate() == null) {
"The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases."
}

clusterHealthRequest = clusterHealthRequest.indices(*indicesList)
clusterStateRequest = clusterStateRequest.indices(*indicesList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class CatShardsRequestWrapper(val pathParams: String = "") : ActionRequest() {
if (pathParams.isNotBlank()) {
indicesList = pathParams.split(",").toTypedArray()

require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." }
require(validate() == null) {
"The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases."
}

clusterStateRequest = clusterStateRequest.indices(*indicesList)
indicesStatsRequest = indicesStatsRequest.indices(*indicesList)
Expand Down Expand Up @@ -173,7 +175,8 @@ class CatShardsResponseWrapper(
searchScrollTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollCount })?.toString(),
segmentsCount = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)?.toString(),
segmentsMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)?.toString(),
segmentsIndexWriterMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(),
segmentsIndexWriterMemory =
getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(),
segmentsVersionMapMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)?.toString(),
fixedBitsetMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)?.toString(),
globalCheckpoint = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)?.toString(),
Expand Down Expand Up @@ -211,7 +214,8 @@ class CatShardsResponseWrapper(
shardInfo = shardInfo.copy(
unassignedReason = shard.unassignedInfo().reason.name,
unassignedAt = UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime),
unassignedFor = TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep,
unassignedFor =
TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep,
unassignedDetails = shard.unassignedInfo().details
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
Expand All @@ -43,49 +42,49 @@ import kotlin.collections.HashMap
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) }
client.admin().cluster().health(request.clusterHealthRequest).get()
val indexSettingsResponse: GetSettingsResponse =
client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) }
client.admin().indices().getSettings(request.indexSettingsRequest).get()
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
client.admin().indices().stats(request.indicesStatsRequest).get()
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
client.admin().cluster().state(request.clusterStateRequest).get()
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) }
client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) }
client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
client.admin().cluster().state(request.clusterStateRequest).get()
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
client.admin().indices().stats(request.indicesStatsRequest).get()
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) }
client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) }
client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) }
client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) }
client.admin().cluster().state(request as ClusterStateRequest).get()
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) }
client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) }
client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) }
}

suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down

0 comments on commit b0f2281

Please sign in to comment.