From dfc20d9ab8117cf57987389af74d2f08e1f8bd6f Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 8 Nov 2022 16:11:41 -0800 Subject: [PATCH] fix bucket level monitor findings to support term aggs in query (#666) (#668) Signed-off-by: Surya Sashank Nistala (cherry picked from commit 0fb9dd04ccd1356b67e320173954cbc928eba4fd) Co-authored-by: Surya Sashank Nistala --- .../alerting/BucketLevelMonitorRunner.kt | 58 +++++++++++----- .../alerting/MonitorRunnerServiceIT.kt | 69 ++++++++++++++++++- 2 files changed, 110 insertions(+), 17 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index d3f534528..3c4fc6425 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -46,6 +46,7 @@ import org.opensearch.script.ScriptType import org.opensearch.script.TemplateScript import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant import java.util.UUID @@ -71,6 +72,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { val currentAlerts = try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) + if (monitor.dataSources.findingsEnabled == true) { + monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources) + } monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts @@ -142,15 +146,19 @@ object BucketLevelMonitorRunner : MonitorRunner() { */ if (triggerResults[trigger.id]?.error != null) continue val findings = - if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings( - triggerResult, - monitor, - monitorCtx, - periodStart, - periodEnd, - !dryrun && monitor.id != Monitor.NO_ID - ) - else emptyList() + if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) { + logger.debug("Creating bucket level findings") + createFindings( + triggerResult, + monitor, + monitorCtx, + periodStart, + periodEnd, + !dryrun && monitor.id != Monitor.NO_ID + ) + } else { + emptyList() + } // TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can // be refactored to use a map instead val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor( @@ -334,15 +342,30 @@ object BucketLevelMonitorRunner : MonitorRunner() { val bucketValues: Set = triggerResult.aggregationResultBuckets.keys val query = input.query var fieldName = "" - var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings + for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) { - val sources = (aggFactory as CompositeAggregationBuilder).sources() - for (source in sources) { - if (grouByFields > 0) { + when (aggFactory) { + is CompositeAggregationBuilder -> { + var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings + val sources = aggFactory.sources() + for (source in sources) { + if (grouByFields > 0) { + logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}") + return listOf() + } + grouByFields++ + fieldName = source.field() + } + } + is TermsAggregationBuilder -> { + fieldName = aggFactory.field() + } + else -> { + logger.error( + "Bucket level monitor findings supported only for composite and term aggs. Found [{${aggFactory.type}}]" + ) return listOf() } - grouByFields++ - fieldName = source.field() } } if (fieldName != "") { @@ -370,6 +393,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { } val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) } return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding) + } else { + logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}") } } } @@ -403,8 +428,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { ) val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + logger.debug("Bucket level monitor ${monitor.id} Findings: $findingStr") if (shouldCreateFinding) { + logger.debug("Saving bucket level monitor findings for monitor ${monitor.id}") val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 8b99ec5dd..4ce7dcd23 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -40,6 +40,7 @@ import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder import java.net.URLEncoder import java.time.Instant @@ -1322,7 +1323,73 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { ) } - fun `test bucket-level monitor with findings enabled`() { + fun `test bucket-level monitor with findings enabled on term agg`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val termAgg = TermsAggregationBuilder("test_field").field("test_field") + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(termAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + // For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy + // so that the assertions done later in this test don't fail. + // The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope). + val actions = randomActionsForBucketLevelTrigger(min = 1).map { + if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) { + it.copy( + actionExecutionPolicy = ActionExecutionPolicy( + PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED)) + ) + ) + } else { + it + } + } + var trigger = randomBucketLevelTrigger(actions = actions) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "test_field", + filter = null + ) + ) + val monitor = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources(findingsEnabled = true) + ) + ) + executeMonitor(monitor.id) + + // Check created Alerts + var currentAlerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, currentAlerts.size) + currentAlerts.forEach { alert -> + Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) + } + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + } + + fun `test bucket-level monitor with findings enabled on composite agg`() { val testIndex = createTestIndex() insertSampleTimeSerializedData( testIndex,