Skip to content

Commit

Permalink
Skip processing metric via streaming if polling offset defined (#586)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpalriwal-Netflix authored Oct 15, 2024
1 parent b300734 commit f4487a0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ abstract class CloudWatchMetricsProcessor(
private[cloudwatch] val filteredNS =
registry.createId("atlas.cloudwatch.datapoints.filtered", "reason", "namespace")

/** The number of data points filtered out before caching due to defined polling offset . */
private[cloudwatch] val pollingDefinedMetric =
registry.createId("atlas.cloudwatch.datapoints.filtered", "reason", "polling")

/** The number of data points filtered out before caching due to not having a metric config. */
private[cloudwatch] val filteredMetric =
registry.createId("atlas.cloudwatch.datapoints.filtered", "reason", "metric")
Expand Down Expand Up @@ -188,25 +192,34 @@ abstract class CloudWatchMetricsProcessor(
var matchedDimensions = false
categories.foreach { tuple =>
val (category, _) = tuple
if (category.dimensionsMatch(dp.dimensions)) {
matchedDimensions = true
if (category.filter.isDefined && tuple._1.filter.get.matches(toTagMap(dp))) {
registry
.counter(filteredQuery.withTag("aws.namespace", dp.namespace))
.increment()
debugger.debugIncoming(
dp,
IncomingMatch.DroppedFilter,
receivedTimestamp,
Some(category)
if (category.pollOffset.isDefined) {
registry
.counter(
pollingDefinedMetric
.withTag("aws.namespace", dp.namespace)
.withTag("aws.metric", dp.metricName)
)
} else {
// finally passed the checks.
updateCache(dp, category, receivedTimestamp)
val delay = receivedTimestamp - dp.datapoint.timestamp().toEpochMilli
registry
.distributionSummary(receivedAge.withTag("aws.namespace", dp.namespace))
.record(delay)
} else {
if (category.dimensionsMatch(dp.dimensions)) {
matchedDimensions = true
if (category.filter.isDefined && tuple._1.filter.get.matches(toTagMap(dp))) {
registry
.counter(filteredQuery.withTag("aws.namespace", dp.namespace))
.increment()
debugger.debugIncoming(
dp,
IncomingMatch.DroppedFilter,
receivedTimestamp,
Some(category)
)
} else {
// finally passed the checks.
updateCache(dp, category, receivedTimestamp)
val delay = receivedTimestamp - dp.datapoint.timestamp().toEpochMilli
registry
.distributionSummary(receivedAge.withTag("aws.namespace", dp.namespace))
.record(delay)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ class CWMPProcessSuite extends BaseCloudWatchMetricsProcessorSuite {
)
}

test("processDatapoints polling offset defined") {
processor.processDatapoints(
List(
makeFirehoseMetric(
"AWS/UT1",
"DailyMetricA",
List(Dimension.builder().name("MyTag").value("a").build()),
Array(24, 0, 3, 10),
"Count"
)
),
ts
)
assertPublished(List.empty)
assertCounters(
1,
filtered = Map("namespace" -> (0, "AWS/UT1"), "polling" -> (1, "DailyMetricA"))
)
}

test("processDatapoints missing tag") {
processor.processDatapoints(
List(
Expand Down

0 comments on commit f4487a0

Please sign in to comment.