Skip to content

Commit

Permalink
fix bucket level monitor findings to support term aggs in query (open…
Browse files Browse the repository at this point in the history
…search-project#666) (opensearch-project#668)

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
(cherry picked from commit 0fb9dd04ccd1356b67e320173954cbc928eba4fd)

Co-authored-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] and eirsep authored Nov 9, 2022
1 parent b8c0fdd commit dfc20d9
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID
Expand All @@ -71,6 +72,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
if (monitor.dataSources.findingsEnabled == true) {
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
}
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down Expand Up @@ -142,15 +146,19 @@ object BucketLevelMonitorRunner : MonitorRunner() {
*/
if (triggerResults[trigger.id]?.error != null) continue
val findings =
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
)
else emptyList()
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) {
logger.debug("Creating bucket level findings")
createFindings(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
)
} else {
emptyList()
}
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
Expand Down Expand Up @@ -334,15 +342,30 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val bucketValues: Set<String> = triggerResult.aggregationResultBuckets.keys
val query = input.query
var fieldName = ""
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings

for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
val sources = (aggFactory as CompositeAggregationBuilder).sources()
for (source in sources) {
if (grouByFields > 0) {
when (aggFactory) {
is CompositeAggregationBuilder -> {
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
val sources = aggFactory.sources()
for (source in sources) {
if (grouByFields > 0) {
logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}")
return listOf()
}
grouByFields++
fieldName = source.field()
}
}
is TermsAggregationBuilder -> {
fieldName = aggFactory.field()
}
else -> {
logger.error(
"Bucket level monitor findings supported only for composite and term aggs. Found [{${aggFactory.type}}]"
)
return listOf()
}
grouByFields++
fieldName = source.field()
}
}
if (fieldName != "") {
Expand Down Expand Up @@ -370,6 +393,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
}
}
Expand Down Expand Up @@ -403,8 +428,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
logger.debug("Bucket level monitor ${monitor.id} Findings: $findingStr")
if (shouldCreateFinding) {
logger.debug("Saving bucket level monitor findings for monitor ${monitor.id}")
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.net.URLEncoder
import java.time.Instant
Expand Down Expand Up @@ -1322,7 +1323,73 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
)
}

fun `test bucket-level monitor with findings enabled`() {
fun `test bucket-level monitor with findings enabled on term agg`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
listOf(
"test_value_1",
"test_value_2"
)
)

val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val termAgg = TermsAggregationBuilder("test_field").field("test_field")
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(termAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

// For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy
// so that the assertions done later in this test don't fail.
// The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope).
val actions = randomActionsForBucketLevelTrigger(min = 1).map {
if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) {
it.copy(
actionExecutionPolicy = ActionExecutionPolicy(
PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED))
)
)
} else {
it
}
}
var trigger = randomBucketLevelTrigger(actions = actions)
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "test_field",
filter = null
)
)
val monitor = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(findingsEnabled = true)
)
)
executeMonitor(monitor.id)

// Check created Alerts
var currentAlerts = searchAlerts(monitor)
assertEquals("Alerts not saved", 2, currentAlerts.size)
currentAlerts.forEach { alert ->
Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1)
}
val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
}

fun `test bucket-level monitor with findings enabled on composite agg`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
Expand Down

0 comments on commit dfc20d9

Please sign in to comment.