diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt index d3447ed99..38420c98a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt @@ -53,7 +53,9 @@ class CatIndicesRequestWrapper(val pathParams: String = "") : ActionRequest() { if (pathParams.isNotBlank()) { indicesList = pathParams.split(",").toTypedArray() - require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." } + require(validate() == null) { + "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." + } clusterHealthRequest = clusterHealthRequest.indices(*indicesList) clusterStateRequest = clusterStateRequest.indices(*indicesList) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt index 47c35eca8..cde22918b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt @@ -52,7 +52,9 @@ class CatShardsRequestWrapper(val pathParams: String = "") : ActionRequest() { if (pathParams.isNotBlank()) { indicesList = pathParams.split(",").toTypedArray() - require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." } + require(validate() == null) { + "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." + } clusterStateRequest = clusterStateRequest.indices(*indicesList) indicesStatsRequest = indicesStatsRequest.indices(*indicesList) @@ -173,7 +175,8 @@ class CatShardsResponseWrapper( searchScrollTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollCount })?.toString(), segmentsCount = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)?.toString(), segmentsMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)?.toString(), - segmentsIndexWriterMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(), + segmentsIndexWriterMemory = + getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(), segmentsVersionMapMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)?.toString(), fixedBitsetMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)?.toString(), globalCheckpoint = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)?.toString(), @@ -211,7 +214,8 @@ class CatShardsResponseWrapper( shardInfo = shardInfo.copy( unassignedReason = shard.unassignedInfo().reason.name, unassignedAt = UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime), - unassignedFor = TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep, + unassignedFor = + TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep, unassignedDetails = shard.unassignedInfo().details ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt index 3dc35e6c4..fd596d202 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt @@ -26,7 +26,6 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.alerting.opensearchapi.convertToMap -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest import org.opensearch.client.Client @@ -43,49 +42,49 @@ import kotlin.collections.HashMap * @param client The [Client] used to call the respective transport action. * @throws IllegalArgumentException When the requested API is not supported by this feature. */ -suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { +fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { val request = resolveToActionRequest(clusterMetricsInput) return when (clusterMetricsInput.clusterMetricType) { ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> { request as CatIndicesRequestWrapper val healthResponse: ClusterHealthResponse = - client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) } + client.admin().cluster().health(request.clusterHealthRequest).get() val indexSettingsResponse: GetSettingsResponse = - client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) } + client.admin().indices().getSettings(request.indexSettingsRequest).get() val indicesResponse: IndicesStatsResponse = - client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) } + client.admin().indices().stats(request.indicesStatsRequest).get() val stateResponse: ClusterStateResponse = - client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) } + client.admin().cluster().state(request.clusterStateRequest).get() return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse) } ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> - client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) } + client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> - client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) } + client.admin().indices().recoveries(request as RecoveryRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> { request as CatShardsRequestWrapper val stateResponse: ClusterStateResponse = - client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) } + client.admin().cluster().state(request.clusterStateRequest).get() val indicesResponse: IndicesStatsResponse = - client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) } + client.admin().indices().stats(request.indicesStatsRequest).get() return CatShardsResponseWrapper(stateResponse, indicesResponse) } ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> - client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) } + client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get() ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> - client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) } + client.admin().cluster().listTasks(request as ListTasksRequest).get() ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> - client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) } + client.admin().cluster().health(request as ClusterHealthRequest).get() ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> { val stateResponse: ClusterStateResponse = - client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) } + client.admin().cluster().state(request as ClusterStateRequest).get() val metadata: Metadata = stateResponse.state.metadata return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY) } ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> - client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) } + client.admin().cluster().clusterStats(request as ClusterStatsRequest).get() ClusterMetricsInput.ClusterMetricType.NODES_STATS -> - client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) } + client.admin().cluster().nodesStats(request as NodesStatsRequest).get() else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}") } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt index 9712b4213..c63ebb51f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) } } - suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { + fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { } } - suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { + fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt index c8b5db561..d945e9b2b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) } } - suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { + fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { } } - suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { + fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)