Skip to content

Commit

Permalink
Use AtlasRegistry for High Resolution CW Polled Metrics (#581)
Browse files Browse the repository at this point in the history
Use AtlasRegistry for High Resolution CW Polled Metrics
  • Loading branch information
mpalriwal-Netflix authored Oct 14, 2024
1 parent 23dbe99 commit b300734
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 57 deletions.
2 changes: 1 addition & 1 deletion atlas-cloudwatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ entire config directory. Fields under the key include:
See [Configuring Timeouts](#configuring-timeouts)
* `filter` an ASL query that will drop any data that matches.
* `poll-offset` indicates the data must be polled instead of expected via Firehose. Used for daily
metrics like S3 bucket aggregates or those with timestamps 2 hours older than wall clock.
metrics like S3 bucket aggregates or those with timestamps 2 hours older than wall clock or high resolutions metrics.
* `dimensions` a list of zero or more dimensions to match on incoming CloudWatch data. See
[Configuring Dimensions](#configuring-dimensions)
* `metrics` a list of one or more metric conversion definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,29 @@ abstract class CloudWatchMetricsProcessor(
.build()
}

private[cloudwatch] def sendToRegistry(
datapoint: FirehoseMetric,
category: MetricCategory,
receivedTimestamp: Long
): Unit = {
category.metrics
.find(_.name == datapoint.metricName)
.foreach { md =>
val metric = MetricData(
MetricMetadata(category, md, toAWSDimensions(datapoint)),
None,
Option(datapoint.datapoint),
None
)

val atlasDp = toAtlasDatapoint(metric, receivedTimestamp, category.period)

if (!atlasDp.value.isNaN) {
publishRouter.publishToRegistry(atlasDp)
}
}
}

private[cloudwatch] def sendToRouter(key: Any, data: Array[Byte], scrapeTimestamp: Long): Unit = {
try {
val entry = CloudWatchCacheEntry.parseFrom(data)
Expand Down Expand Up @@ -839,8 +862,12 @@ abstract class CloudWatchMetricsProcessor(
// TODO - clean this out once tests are finished
// (if (testMode) s"TEST.${definition.alias}" else definition.alias))
definition.alias)
val newValue =
definition.conversion(metric.meta, metric.datapoint(Instant.ofEpochMilli(timestamp)))

val newValue = definition.conversion(
metric.meta,
metric.datapoint(Instant.ofEpochMilli(timestamp))
)

// NOTE - the polling CW code uses now for the timestamp, likely for LWC. BUT data could be off by
// minutes potentially. And it didn't account for the offset value as far as I could tell. Now we'll at least
// use the offset value.
Expand Down Expand Up @@ -1024,6 +1051,16 @@ object CloudWatchMetricsProcessor {
}.toList
}

private[cloudwatch] def toAWSDimensions(entry: FirehoseMetric): List[Dimension] = {
entry.dimensions.map { d =>
Dimension
.builder()
.name(d.name())
.value(d.value())
.build()
}.toList
}

/**
* Converts the dimensions from the Protobuf entry to a sorted tag map to use in evaluating against a
* Query filter from a [MetricCategory] config. Note that the `name` and `aws.namespace` tags are populated per
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.netflix.atlas.cloudwatch

import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import com.netflix.atlas.cloudwatch.CloudWatchMetricsProcessor.normalize
import com.netflix.atlas.cloudwatch.CloudWatchMetricsProcessor.toTagMap
import com.netflix.atlas.cloudwatch.CloudWatchPoller.runAfter
Expand All @@ -28,6 +26,8 @@ import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.patterns.PolledMeter
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Source
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient
Expand Down Expand Up @@ -101,6 +101,7 @@ class CloudWatchPoller(
private val dpsDroppedFilter =
registry.counter("atlas.cloudwatch.poller.dps.dropped", "reason", "filter")
private val dpsExpected = registry.counter("atlas.cloudwatch.poller.dps.expected")
private val hrmRequest = registry.createId("atlas.cloudwatch.poller.hrm.request")
private val dpsPolled = registry.counter("atlas.cloudwatch.poller.dps.polled")
private val frequency = config.getDuration("atlas.cloudwatch.poller.frequency").getSeconds
private val hrmFrequency = config.getDuration("atlas.cloudwatch.poller.hrmFrequency").getSeconds
Expand All @@ -116,7 +117,9 @@ class CloudWatchPoller(
val offset = category.pollOffset.get.getSeconds.toInt
val categories = map.getOrElse(offset, List.empty)
map += offset -> (categories :+ category)
logger.info(s"Setting offset of ${offset}s for categories ${category.namespace}")
logger.info(
s"Setting offset of ${offset}s for ns ${category.namespace} period ${category.period}"
)
}
logger.info(s"Loaded ${map.size} polling offsets")
map
Expand Down Expand Up @@ -281,7 +284,7 @@ class CloudWatchPoller(

private[cloudwatch] def execute: Future[Done] = {
logger.info(
s"Polling for account ${account} at ${offset}s and category ${category.namespace} in ${region}"
s"Polling for account ${account} at ${offset}s and ns ${category.namespace} period ${category.period} in ${region}"
)
val futures = category.toListRequests.map { tuple =>
val (definition, request) = tuple
Expand Down Expand Up @@ -400,6 +403,12 @@ class CloudWatchPoller(
val start = now.minusSeconds(minCacheEntries * category.period)
val request = MetricMetadata(category, definition, metric.dimensions.asScala.toList)
.toGetRequest(start, now)

if (request.period() < 60) {
registry
.counter(hrmRequest.withTags("period", request.period().toString))
.increment()
}
val response = client.getMetricStatistics(request)
val dimensions = request.dimensions().asScala.toList.toBuffer
dimensions += Dimension.builder().name("nf.account").value(account).build()
Expand Down Expand Up @@ -428,7 +437,11 @@ class CloudWatchPoller(
dimensions.toList,
dp
)
processor.updateCache(m, category, nowMillis)
if (category.period < 60) {
processor.sendToRegistry(m, category, nowMillis)
} else {
processor.updateCache(m, category, nowMillis)
}
}
got.incrementAndGet()
promise.success(Done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ object Conversions {
unit(from(name, conversions))
}

private def from(name: String, conversions: List[String]): Conversion = {
private def from(
name: String,
conversions: List[String]
): Conversion = {
conversions match {
case "min" :: Nil => min
case "max" :: Nil => max
Expand Down Expand Up @@ -96,7 +99,15 @@ object Conversions {
private def rate(f: Conversion): Conversion = (m, d) => {
val v = f(m, d)
val unit = d.unitAsString()
if (unit.endsWith("/Second")) v else v / m.category.period
val applyRateConversion = m.category.period >= 60

if (!applyRateConversion && unit.endsWith("/Second")) {
v * m.category.period // Convert back to cumulative value
} else if (applyRateConversion && !unit.endsWith("/Second")) {
v / m.category.period // Convert to rate
} else {
v // No conversion needed
}
}

/** Modifies a conversion `f` to multiply the result by `v`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ object MetricDefinition {
* Note, count must come first so it is easy to skip for other unit conversions. See
* [[milliTimer]] for example.
*/
private def newDist(config: Config, total: String, tags: Tags): List[MetricDefinition] = {
private def newDist(
config: Config,
total: String,
tags: Tags
): List[MetricDefinition] = {
List(
newMetricDef(config, "count,rate", tags + ("statistic" -> "count")),
newMetricDef(config, "sum,rate", tags + ("statistic" -> total)),
Expand All @@ -96,14 +100,21 @@ object MetricDefinition {
* Timer where the input unit is milliseconds and we need to perform an unit conversion
* on the totalTime and max results.
*/
private def milliTimer(config: Config, tags: Tags): List[MetricDefinition] = {
private def milliTimer(
config: Config,
tags: Tags
): List[MetricDefinition] = {
val ms = newDist(config, "totalTime", tags)
ms.head :: ms.tail.map { m =>
m.copy(conversion = Conversions.toUnit(m.conversion, StandardUnit.MILLISECONDS))
}
}

private def newMetricDef(config: Config, cnv: String, tags: Tags): MetricDefinition = {
private def newMetricDef(
config: Config,
cnv: String,
tags: Tags
): MetricDefinition = {
val dstype = Map(TagKey.dsType -> Conversions.determineDsType(cnv))
val monotonic = config.hasPath("monotonic") && config.getBoolean("monotonic")
MetricDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.netflix.atlas.cloudwatch

import com.netflix.atlas.cloudwatch.poller.DoubleValue
import com.netflix.atlas.cloudwatch.poller.PublishClient
import com.netflix.atlas.cloudwatch.poller.PublishConfig
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.HttpEntity
Expand All @@ -28,7 +31,9 @@ import com.netflix.atlas.pekko.PekkoHttpClient
import com.netflix.atlas.pekko.CustomMediaTypes
import com.netflix.atlas.pekko.StreamOps
import com.netflix.atlas.core.model.Datapoint
import com.netflix.atlas.core.model.DsType
import com.netflix.atlas.json.Json
import com.netflix.iep.leader.api.LeaderStatus
import com.netflix.spectator.api.Functions
import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry
Expand Down Expand Up @@ -57,6 +62,9 @@ class PublishQueue(
registry: Registry,
val stack: String,
val uri: String,
val configUri: String,
val evalUri: String,
val status: LeaderStatus,
httpClient: PekkoHttpClient,
scheduler: ScheduledExecutorService
)(implicit system: ActorSystem)
Expand All @@ -75,6 +83,10 @@ class PublishQueue(
private val batchSize = getSetting("batchSize")
private val batchTimeout = getDurationSetting("batchTimeout")

private val registryPublishClient = new PublishClient(
new PublishConfig(config, uri, configUri, evalUri, status = status, registry = registry)
)

private val lastUpdateTimestamp =
PolledMeter
.using(registry)
Expand All @@ -90,14 +102,28 @@ class PublishQueue(
.map(publish)
.toMat(Sink.ignore)(Keep.left)
.run()
logger.info(s"Setup queue for stack ${stack} publishing URI ${uri}")

/**
* Adds the data point to the queue if there is room. Increments a metric if not.
*
* @param datapoint
* The non-null data point to enqueue.
*/

logger.info(
s"Setup queue for stack ${stack} publishing URI ${uri}, lwc-config URI ${configUri}, eval URI ${evalUri}"
)

def updateRegistry(dp: AtlasDatapoint): Unit = {
val atlasDp = toDoubleValue(dp)
if (dp.dsType == DsType.Rate) {
registryPublishClient.updateCounter(atlasDp.id, atlasDp.value)
} else if (dp.dsType == DsType.Gauge) {
registryPublishClient.updateGauge(atlasDp.id, atlasDp.value)
} else {
logger.error(s"Unknown ds type, skip updating registry ${dp.dsType}")
}
}

private def toDoubleValue(
datapoint: AtlasDatapoint
): DoubleValue = {
DoubleValue(datapoint.tags, datapoint.value)
}

def enqueue(datapoint: AtlasDatapoint): Unit = publishQueue.offer(datapoint)

private[cloudwatch] def publish(datapoints: Seq[AtlasDatapoint]): Future[NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.apache.pekko.actor.ActorSystem
import com.netflix.atlas.pekko.PekkoHttpClient
import com.netflix.atlas.cloudwatch.PublishRouter.defaultKey
import com.netflix.iep.config.NetflixEnvironment
import com.netflix.iep.leader.api.LeaderStatus
import com.netflix.spectator.api.Registry
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
Expand All @@ -30,12 +31,15 @@ class PublishRouter(
config: Config,
registry: Registry,
tagger: Tagger,
httpClient: PekkoHttpClient
httpClient: PekkoHttpClient,
val status: LeaderStatus
)(implicit system: ActorSystem)
extends StrictLogging {

private val schedulers = Executors.newScheduledThreadPool(2)
private val baseURI = config.getString("atlas.cloudwatch.account.routing.uri")
private val baseConfigURI = config.getString("atlas.cloudwatch.account.routing.config-uri")
private val baseEvalURI = config.getString("atlas.cloudwatch.account.routing.eval-uri")

private val missingAccount =
registry.counter("atlas.cloudwatch.queue.dps.dropped", "reason", "missingAccount")
Expand All @@ -47,6 +51,13 @@ class PublishRouter(
baseURI
.replaceAll("\\$\\{STACK\\}", "main")
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
baseConfigURI
.replaceAll("\\$\\{STACK\\}", "main")
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
baseEvalURI
.replaceAll("\\$\\{STACK\\}", "main")
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
status,
httpClient,
schedulers
)
Expand Down Expand Up @@ -85,6 +96,13 @@ class PublishRouter(
baseURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", destination),
baseConfigURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", destination),
baseEvalURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", destination),
status,
httpClient,
schedulers
)
Expand All @@ -99,6 +117,13 @@ class PublishRouter(
baseURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
baseConfigURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
baseEvalURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
status,
httpClient,
schedulers
)
Expand All @@ -124,12 +149,25 @@ class PublishRouter(
}
}

private[cloudwatch] def getQueue(datapoint: AtlasDatapoint): Option[PublishQueue] = {
datapoint.tags.get("nf.account") match {
/**
* Routes the data to the proper atlas registry instance based on the `nf.account` and region tag.
*
* @param datapoint
* The non-null data point.
*/
def publishToRegistry(datapoint: AtlasDatapoint): Unit = {
getQueue(datapoint) match {
case Some(queue) => queue.updateRegistry(datapoint)
case None => missingAccount.increment()
}
}

private[cloudwatch] def getQueue(tags: Map[String, String]): Option[PublishQueue] = {
tags.get("nf.account") match {
case Some(account) =>
accountMap.get(account) match {
case Some(regionMap) =>
val region = datapoint.tags.get("nf.region").getOrElse(defaultKey)
val region = tags.get("nf.region").getOrElse(defaultKey)
regionMap.get(region) match {
case Some(queue) => Some(queue)
case None => regionMap.get(defaultKey)
Expand All @@ -140,6 +178,10 @@ class PublishRouter(
}
}

private[cloudwatch] def getQueue(datapoint: AtlasDatapoint): Option[PublishQueue] = {
getQueue(datapoint.tags)
}

def shutdown(): Unit = {
schedulers.shutdownNow()
}
Expand Down
Loading

0 comments on commit b300734

Please sign in to comment.