From b6124874088801cf20527c89d00b689f5e1be6ce Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 7 Nov 2022 10:16:20 -0800 Subject: [PATCH] Adds findings in bucket level monitor (#636) (#651) * bucket level monitor findings Signed-off-by: Surya Sashank Nistala * add test to verify bucket level monitor findings Signed-off-by: Surya Sashank Nistala * added tests. fixed document ids in bucket level monitor findings Signed-off-by: Surya Sashank Nistala Signed-off-by: Surya Sashank Nistala (cherry picked from commit 5b451b988b7cad0b5a1076daa8908c2fd68db154) Co-authored-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertService.kt | 9 +- .../alerting/BucketLevelMonitorRunner.kt | 149 +++++++++++++++++- .../opensearch/alerting/AlertServiceTests.kt | 20 ++- .../alerting/AlertingRestTestCase.kt | 6 +- .../alerting/MonitorRunnerServiceIT.kt | 138 ++++++++++++++++ .../org/opensearch/alerting/TestHelpers.kt | 26 +++ 6 files changed, 336 insertions(+), 12 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 7326c7c01..2c913c135 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -15,6 +15,7 @@ import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult @@ -236,7 +237,8 @@ class AlertService( monitor: Monitor, trigger: BucketLevelTrigger, currentAlerts: MutableMap, - aggResultBuckets: List + aggResultBuckets: List, + findings: List ): Map> { val dedupedAlerts = mutableListOf() val newAlerts = mutableListOf() @@ -256,7 +258,8 @@ class AlertService( monitor = monitor, trigger = trigger, startTime = currentTime, lastNotificationTime = null, state = Alert.State.ACTIVE, errorMessage = null, errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(), - schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket + schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket, + findingIds = findings ) newAlerts.add(newAlert) } @@ -381,7 +384,7 @@ class AlertService( // If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in // relation to index request in the retried bulk request for when it eventually succeeds. retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { - val bulkRequest = BulkRequest().add(requestsToRetry) + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } // TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index c96f4ed57..d3f534528 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -6,24 +6,49 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope +import org.opensearch.commons.alerting.util.string +import org.opensearch.index.query.BoolQueryBuilder +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.script.Script +import org.opensearch.script.ScriptType +import org.opensearch.script.TemplateScript +import org.opensearch.search.aggregations.AggregatorFactories +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant +import java.util.UUID object BucketLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) @@ -116,11 +141,20 @@ object BucketLevelMonitorRunner : MonitorRunner() { * existing Alerts in a way the user can easily view them since they will have all been moved to the history index. */ if (triggerResults[trigger.id]?.error != null) continue - + val findings = + if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings( + triggerResult, + monitor, + monitorCtx, + periodStart, + periodEnd, + !dryrun && monitor.id != Monitor.NO_ID + ) + else emptyList() // TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can // be refactored to use a map instead val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor( - monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList() + monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList(), findings ).toMutableMap() val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList()) var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList()) @@ -287,6 +321,117 @@ object BucketLevelMonitorRunner : MonitorRunner() { return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults) } + private suspend fun createFindings( + triggerResult: BucketLevelTriggerRunResult, + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + shouldCreateFinding: Boolean + ): List { + monitor.inputs.forEach { input -> + if (input is SearchInput) { + val bucketValues: Set = triggerResult.aggregationResultBuckets.keys + val query = input.query + var fieldName = "" + var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings + for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) { + val sources = (aggFactory as CompositeAggregationBuilder).sources() + for (source in sources) { + if (grouByFields > 0) { + return listOf() + } + grouByFields++ + fieldName = source.field() + } + } + if (fieldName != "") { + val searchParams = mapOf( + "period_start" to periodStart.toEpochMilli(), + "period_end" to periodEnd.toEpochMilli() + ) + val searchSource = monitorCtx.scriptService!!.compile( + Script( + ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, + query.toString(), searchParams + ), + TemplateScript.CONTEXT + ) + .newInstance(searchParams) + .execute() + val sr = SearchRequest(*input.indices.toTypedArray()) + XContentType.JSON.xContent().createParser(monitorCtx.xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource) + .use { + val source = SearchSourceBuilder.fromXContent(it) + val queryBuilder = if (input.query.query() == null) BoolQueryBuilder() + else QueryBuilders.boolQuery().must(source.query()) + queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues)) + sr.source().query(queryBuilder) + } + val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) } + return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding) + } + } + } + return listOf() + } + + private suspend fun createFindingPerIndex( + searchResponse: SearchResponse, + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + shouldCreateFinding: Boolean + ): List { + val docIdsByIndexName: MutableMap> = mutableMapOf() + for (hit in searchResponse.hits.hits) { + val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf()) + ids.add(hit.id) + docIdsByIndexName[hit.index] = ids + } + val findings = mutableListOf() + var requestsToRetry: MutableList = mutableListOf() + docIdsByIndexName.entries.forEach { it -> + run { + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = it.value, + monitorId = monitor.id, + monitorName = monitor.name, + index = it.key, + timestamp = Instant.now(), + docLevelQueries = listOf() + ) + + val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() + logger.debug("Findings: $findingStr") + if (shouldCreateFinding) { + val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + requestsToRetry.add(indexRequest) + } + findings.add(finding.id) + } + } + if (requestsToRetry.isEmpty()) return listOf() + monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) } + requestsToRetry = mutableListOf() + val findingsBeingRetried = mutableListOf() + bulkResponse.items.forEach { item -> + if (item.isFailed) { + if (item.status() == RestStatus.TOO_MANY_REQUESTS) { + requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest) + findingsBeingRetried.add(findingsBeingRetried[item.itemId]) + } + } + } + } + return findings + } + private fun getActionContextForAlertCategory( alertCategory: AlertCategory, alert: Alert, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt index 6fc2055dd..982a3281e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt @@ -83,7 +83,9 @@ class AlertServiceTests : OpenSearchTestCase() { ) ) - val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( + monitor, trigger, currentAlerts, aggResultBuckets, emptyList() + ) // Completed Alerts are what remains in currentAlerts after categorization val completedAlerts = currentAlerts.values.toList() assertEquals(listOf(), categorizedAlerts[AlertCategory.DEDUPED]) @@ -115,7 +117,9 @@ class AlertServiceTests : OpenSearchTestCase() { ) ) - val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( + monitor, trigger, currentAlerts, aggResultBuckets, emptyList() + ) // Completed Alerts are what remains in currentAlerts after categorization val completedAlerts = currentAlerts.values.toList() assertAlertsExistForBucketKeys( @@ -142,7 +146,9 @@ class AlertServiceTests : OpenSearchTestCase() { ) val aggResultBuckets = listOf() - val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( + monitor, trigger, currentAlerts, aggResultBuckets, emptyList() + ) // Completed Alerts are what remains in currentAlerts after categorization val completedAlerts = currentAlerts.values.toList() assertEquals(listOf(), categorizedAlerts[AlertCategory.DEDUPED]) @@ -174,7 +180,9 @@ class AlertServiceTests : OpenSearchTestCase() { ) ) - val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( + monitor, trigger, currentAlerts, aggResultBuckets, emptyList() + ) // Completed Alerts are what remains in currentAlerts after categorization val completedAlerts = currentAlerts.values.toList() assertAlertsExistForBucketKeys(listOf(listOf("b")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found")) @@ -198,7 +206,9 @@ class AlertServiceTests : OpenSearchTestCase() { ) ) - val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( + monitor, trigger, currentAlerts, aggResultBuckets, emptyList() + ) // Completed Alerts are what remains in currentAlerts after categorization val completedAlerts = currentAlerts.values.toList() assertAlertsExistForBucketKeys(listOf(listOf("a")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found")) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index bacdd2c40..952c7f1db 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -779,7 +779,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, - "test_field" : { "type" : "keyword" } + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } } """.trimIndent() ) @@ -866,7 +867,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val testDoc = """ { "test_strict_date_time": "$testTime", - "test_field": "$value" + "test_field": "$value", + "number": "$i" } """.trimIndent() // Indexing documents with deterministic doc id to allow for easy selected deletion during testing diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 007fcf4b0..8b99ec5dd 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -25,6 +25,7 @@ import org.opensearch.commons.alerting.model.Alert.State.ACKNOWLEDGED import org.opensearch.commons.alerting.model.Alert.State.ACTIVE import org.opensearch.commons.alerting.model.Alert.State.COMPLETED import org.opensearch.commons.alerting.model.Alert.State.ERROR +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.SearchInput @@ -1321,6 +1322,143 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { ) } + fun `test bucket-level monitor with findings enabled`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + // For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy + // so that the assertions done later in this test don't fail. + // The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope). + val actions = randomActionsForBucketLevelTrigger(min = 1).map { + if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) { + it.copy( + actionExecutionPolicy = ActionExecutionPolicy( + PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED)) + ) + ) + } else { + it + } + } + var trigger = randomBucketLevelTrigger(actions = actions) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources(findingsEnabled = true) + ) + ) + executeMonitor(monitor.id) + + // Check created Alerts + var currentAlerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, currentAlerts.size) + currentAlerts.forEach { alert -> + Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) + } + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + } + + fun `test bucket-level monitor with findings enabled for multiple group by fields`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field"), + TermsValuesSourceBuilder("number").field("number") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + // For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy + // so that the assertions done later in this test don't fail. + // The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope). + val actions = randomActionsForBucketLevelTrigger(min = 1).map { + if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) { + it.copy( + actionExecutionPolicy = ActionExecutionPolicy( + PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED)) + ) + ) + } else { + it + } + } + var trigger = randomBucketLevelTrigger(actions = actions) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources(findingsEnabled = true) + ) + ) + executeMonitor(monitor.id) + + // Check created Alerts + var currentAlerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, currentAlerts.size) + currentAlerts.forEach { alert -> + Assert.assertEquals("expected findings for alert", alert.findingIds.size, 0) + } + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 0, findings.size) + } + @Suppress("UNCHECKED_CAST") fun `test bucket-level monitor with one good action and one bad action`() { val testIndex = createTestIndex() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 2b99b514a..02a2b9d5d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -135,6 +135,32 @@ fun randomBucketLevelMonitor( ) } +fun randomBucketLevelMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User = randomUser(), + inputs: List = listOf( + SearchInput( + emptyList(), + SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + .aggregation(TermsAggregationBuilder("test_agg").field("test_field")) + ) + ), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomBucketLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false, + dataSources: DataSources +): Monitor { + return Monitor( + name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), + dataSources = dataSources + ) +} + fun randomClusterMetricsMonitor( name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), user: User = randomUser(),