Skip to content

Commit

Permalink
Fixed monitor type checks.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
  • Loading branch information
AWSHurneyt committed May 23, 2024
1 parent 8597561 commit 5710f8b
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.RemoteTransportException
import java.util.*
import kotlin.collections.HashMap

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

Expand Down Expand Up @@ -185,10 +187,10 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val runContext = if (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
else null
return if (runContext != null) {
Expand All @@ -208,10 +210,10 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
val runContext = if (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
val monitor = job as Monitor
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType.name}, periodStart: $periodStart, " +
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.*

object QueryLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -68,7 +69,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = when (monitor.monitorType) {
val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
Expand All @@ -80,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}

triggerResults[trigger.id] = triggerResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.rest.RestResponse
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant
import java.util.*

private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)

Expand Down Expand Up @@ -98,7 +99,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
validateDataSources(monitor)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
when (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.*

private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand Down Expand Up @@ -82,7 +83,7 @@ class TransportExecuteMonitorAction @Inject constructor(
}
try {
log.info(
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " +
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType}, " +
"periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}"
)
val monitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun, transportService)
Expand Down Expand Up @@ -134,7 +135,7 @@ class TransportExecuteMonitorAction @Inject constructor(
false -> (execMonitorRequest.monitor as Monitor).copy(user = user)
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
try {
scope.launch {
if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Duration
import java.util.*

private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand Down Expand Up @@ -525,7 +526,7 @@ class TransportIndexMonitorAction @Inject constructor(
throw t
}
try {
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(request.monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
Expand Down Expand Up @@ -683,7 +684,7 @@ class TransportIndexMonitorAction @Inject constructor(
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
// Recreate runContext if metadata exists
// Delete and insert all queries from/to queryIndex
if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (!created && Monitor.MonitorType.valueOf(currentMonitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor)
client.suspendUntil<Client, BulkByScrollResponse> {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.util.UUID
import java.util.*
import java.util.stream.Collectors

private val log = LogManager.getLogger(TransportIndexWorkflowAction::class.java)
Expand Down Expand Up @@ -400,7 +400,7 @@ class TransportIndexWorkflowAction @Inject constructor(
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
}
Expand Down Expand Up @@ -554,7 +554,7 @@ class TransportIndexWorkflowAction @Inject constructor(
workflowMetadataId = workflowMetadata.id
)

if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (!created && Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor)
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
Expand Down Expand Up @@ -632,7 +632,7 @@ class TransportIndexWorkflowAction @Inject constructor(
* Returns list of indices for the given monitor depending on it's type
*/
private fun getMonitorIndices(monitor: Monitor): List<String> {
return when (monitor.monitorType) {
return when (Monitor.MonitorType.valueOf(monitor.monitorType.toString().uppercase(Locale.ROOT))) {
Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices }
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import java.util.*
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")
Expand Down Expand Up @@ -78,9 +79,9 @@ fun Destination.isAllowed(allowList: List<String>): Boolean = allowList.contains

fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTION

fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR
fun Monitor.isDocLevelMonitor(): Boolean = Monitor.MonitorType.valueOf(this.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR

fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR
fun Monitor.isQueryLevelMonitor(): Boolean = Monitor.MonitorType.valueOf(this.monitorType.toString().uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR

/**
* Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used
Expand Down

0 comments on commit 5710f8b

Please sign in to comment.