Skip to content

Commit

Permalink
optimize bucket level monitor to resolve alias to query only those ti…
Browse files Browse the repository at this point in the history
…me-series indices that contain docs within timeframe of range query filter in search input

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Oct 18, 2024
1 parent 5b27d24 commit 67be543
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
.registerInputService(
InputService(
client,
scriptService,
namedWriteableRegistry,
xContentRegistry,
clusterService,
settings,
indexNameExpressionResolver
)
)
.registerTriggerService(triggerService)
.registerAlertService(alertService)
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
Expand Down
121 changes: 119 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
Expand All @@ -40,12 +42,14 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Duration
import java.time.Instant

/** Service that handles the collection of input results for Monitor executions */
Expand All @@ -55,7 +59,8 @@ class InputService(
val namedWriteableRegistry: NamedWriteableRegistry,
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
val settings: Settings
val settings: Settings,
val indexNameExpressionResolver: IndexNameExpressionResolver
) {

private val logger = LogManager.getLogger(InputService::class.java)
Expand Down Expand Up @@ -245,8 +250,9 @@ class InputService(
.execute()

val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService)
val resolvedIndexes = resolveOnlyQueryableIndicesFromLocalClusterAliases(monitor, periodEnd, searchInput.query.query(), indexes)
val searchRequest = SearchRequest()
.indices(*indexes.toTypedArray())
.indices(*resolvedIndexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())

XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
Expand All @@ -256,6 +262,69 @@ class InputService(
return searchRequest
}

/**
* Resolves concrete indices from aliases based on a time range query and availability in the local cluster.
*
* <p>If an index passed to OpenSearch is an alias, this method will only select those indices
* resolved from the alias that meet the following criteria:
*
* <ol>
* <li>The index's creation date falls within the time range specified in the query's timestamp field.</li>
* <li>The index immediately preceding the time range in terms of creation date is also included.</li>
* </ol>
*
* <p>This ensures that queries targeting aliases consider relevant indices based on their creation time,
* including the one immediately before the specified range to account for potential data at the boundary.
*/
private fun resolveOnlyQueryableIndicesFromLocalClusterAliases(
monitor: Monitor,
periodEnd: Instant,
query: QueryBuilder,
indexes: List<String>,
): List<String> {
val resolvedIndexes = ArrayList<String>()
indexes.forEach {
// we don't optimize for remote cluster aliases. we directly pass them to search request
if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService))
resolvedIndexes.add(it)
else {
val state = clusterService.state()
if (IndexUtils.isAlias(it, state)) {
val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd)
if (resolveStartTimeOfQueryTimeRange != null) {
val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver)
val sortedIndices = indices
.mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index
.sortedBy { it.creationDate } // Sort by creation date

var includePrevious = true
for (i in sortedIndices.indices) {
val indexMetadata = sortedIndices[i]
val creationDate = indexMetadata.creationDate

if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) {
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false // No need to include previous anymore
} else if (
includePrevious && i > 0 && sortedIndices[i - 1].creationDate <
resolveStartTimeOfQueryTimeRange.toEpochMilli()
) {
// Include the index immediately before the timestamp
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false
}
}
} else {
resolvedIndexes.add(it)
}
} else {
resolvedIndexes.add(it)
}
}
}
return resolvedIndexes
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

Expand Down Expand Up @@ -289,4 +358,52 @@ class InputService(
}
return results
}

fun resolveStartTimeofQueryTimeRange(monitor: Monitor, query: QueryBuilder, periodEnd: Instant): Instant? {
try {
val rangeQuery = findRangeQuery(query) ?: return null
val searchParameter = rangeQuery.from().toString() // we are looking for 'timeframe' variable {{period_end}}||-<timeframe>

val timeframeString = searchParameter.substringAfter("||-")
val timeframeRegex = Regex("(\\d+)([a-zA-Z]+)")
val matchResult = timeframeRegex.find(timeframeString)
val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u }
?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString")
val duration = when (unit) {
"m" -> Duration.ofMinutes(amount.toLong())
"h" -> Duration.ofHours(amount.toLong())
"d" -> Duration.ofDays(amount.toLong())
else -> throw IllegalArgumentException("Invalid time unit: $unit")
}

return periodEnd.minus(duration)
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id}:" +
" Failed to resolve time frame of search query while optimizing to query only on few of alias' concrete indices",
e
)
return null // won't do optimization as we failed to resolve the timeframe due to unexpected error
}
}

private fun findRangeQuery(queryBuilder: QueryBuilder?): RangeQueryBuilder? {
if (queryBuilder == null) return null
if (queryBuilder is RangeQueryBuilder) return queryBuilder

if (queryBuilder is BoolQueryBuilder) {
for (clause in queryBuilder.must()) {
val rangeQuery = findRangeQuery(clause)
if (rangeQuery != null) return rangeQuery
}
for (clause in queryBuilder.should()) {
val rangeQuery = findRangeQuery(clause)
if (rangeQuery != null) return rangeQuery
}
// You can also check queryBuilder.filter() and queryBuilder.mustNot() if needed
}

// Add handling for other query types if necessary (e.g., NestedQueryBuilder, etc.)
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,10 @@ class CrossClusterMonitorUtils {
return if (clusterName.isNotEmpty()) "$clusterName:$indexName"
else indexName
}

fun isRemoteClusterIndex(index: String, clusterService: ClusterService): Boolean {
val clusterName = parseClusterName(index)
return clusterName.isNotEmpty() && clusterService.clusterName.value() != clusterName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1032,19 +1032,33 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex))
}

protected fun createTestAlias(
alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT),
numOfAliasIndices: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true,
indicesMapping: String,
): MutableMap<String, MutableMap<String, Boolean>> {
return createTestAlias(
alias = alias,
indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex),
indicesMapping = indicesMapping
)
}

protected fun createTestAlias(
alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT),
indices: Map<String, Boolean> = randomAliasIndices(
alias = alias,
num = randomIntBetween(1, 10),
includeWriteIndex = true
),
createIndices: Boolean = true
createIndices: Boolean = true,
indicesMapping: String = ""
): MutableMap<String, MutableMap<String, Boolean>> {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
if (createIndices) createTestIndex(index = it, mapping = "")
if (createIndices) createTestIndex(index = it, indicesMapping)
val isWriteIndex = indices.getOrDefault(it, false)
indicesMap[it] = isWriteIndex
val indexMap = mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,60 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test execute bucket-level monitor with alias`() {
val indexMapping = """
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping)
val aliasName = alias.keys.first()
insertSampleTimeSerializedData(
aliasName,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2"
)
)

val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null
)
)
val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)))
val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR)
val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>>
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test execute bucket-level monitor returns search result with multi term agg`() {
val index = "test_index_1234"
indexDoc(
Expand Down

0 comments on commit 67be543

Please sign in to comment.