Skip to content

Commit

Permalink
Adds findings in bucket level monitor (opensearch-project#636) (opens…
Browse files Browse the repository at this point in the history
…earch-project#651)

* bucket level monitor findings

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add test to verify bucket level monitor findings

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* added tests. fixed document ids in bucket level monitor findings

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
(cherry picked from commit 5b451b988b7cad0b5a1076daa8908c2fd68db154)

Co-authored-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] and eirsep authored Nov 7, 2022
1 parent 29f26c9 commit b612487
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
Expand Down Expand Up @@ -236,7 +237,8 @@ class AlertService(
monitor: Monitor,
trigger: BucketLevelTrigger,
currentAlerts: MutableMap<String, Alert>,
aggResultBuckets: List<AggregationResultBucket>
aggResultBuckets: List<AggregationResultBucket>,
findings: List<String>
): Map<AlertCategory, List<Alert>> {
val dedupedAlerts = mutableListOf<Alert>()
val newAlerts = mutableListOf<Alert>()
Expand All @@ -256,7 +258,8 @@ class AlertService(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = null, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings
)
newAlerts.add(newAlert)
}
Expand Down Expand Up @@ -381,7 +384,7 @@ class AlertService(
// If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in
// relation to index request in the retried bulk request for when it eventually succeeds.
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
// TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,49 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID

object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -116,11 +141,20 @@ object BucketLevelMonitorRunner : MonitorRunner() {
* existing Alerts in a way the user can easily view them since they will have all been moved to the history index.
*/
if (triggerResults[trigger.id]?.error != null) continue

val findings =
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
)
else emptyList()
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList()
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList(), findings
).toMutableMap()
val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList())
var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())
Expand Down Expand Up @@ -287,6 +321,117 @@ object BucketLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

private suspend fun createFindings(
triggerResult: BucketLevelTriggerRunResult,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
val bucketValues: Set<String> = triggerResult.aggregationResultBuckets.keys
val query = input.query
var fieldName = ""
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
val sources = (aggFactory as CompositeAggregationBuilder).sources()
for (source in sources) {
if (grouByFields > 0) {
return listOf()
}
grouByFields++
fieldName = source.field()
}
}
if (fieldName != "") {
val searchParams = mapOf(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
val searchSource = monitorCtx.scriptService!!.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
query.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()
val sr = SearchRequest(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(monitorCtx.xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource)
.use {
val source = SearchSourceBuilder.fromXContent(it)
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
}
}
}
return listOf()
}

private suspend fun createFindingPerIndex(
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean
): List<String> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf())
ids.add(hit.id)
docIdsByIndexName[hit.index] = ids
}
val findings = mutableListOf<String>()
var requestsToRetry: MutableList<IndexRequest> = mutableListOf()
docIdsByIndexName.entries.forEach { it ->
run {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = it.value,
monitorId = monitor.id,
monitorName = monitor.name,
index = it.key,
timestamp = Instant.now(),
docLevelQueries = listOf()
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
requestsToRetry.add(indexRequest)
}
findings.add(finding.id)
}
}
if (requestsToRetry.isEmpty()) return listOf()
monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) }
requestsToRetry = mutableListOf()
val findingsBeingRetried = mutableListOf<Alert>()
bulkResponse.items.forEach { item ->
if (item.isFailed) {
if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest)
findingsBeingRetried.add(findingsBeingRetried[item.itemId])
}
}
}
}
return findings
}

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertEquals(listOf<Alert>(), categorizedAlerts[AlertCategory.DEDUPED])
Expand Down Expand Up @@ -115,7 +117,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertAlertsExistForBucketKeys(
Expand All @@ -142,7 +146,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
val aggResultBuckets = listOf<AggregationResultBucket>()

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertEquals(listOf<Alert>(), categorizedAlerts[AlertCategory.DEDUPED])
Expand Down Expand Up @@ -174,7 +180,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertAlertsExistForBucketKeys(listOf(listOf("b")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found"))
Expand All @@ -198,7 +206,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertAlertsExistForBucketKeys(listOf(listOf("a")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" }
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
)
Expand Down Expand Up @@ -866,7 +867,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value"
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
Expand Down
Loading

0 comments on commit b612487

Please sign in to comment.