Skip to content

Commit

Permalink
Add latency metric
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 24, 2023
1 parent bc0925e commit 798b621
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics}
trait Metrics[F[_]] {
def addGood(count: Int): F[Unit]
def addBad(count: Int): F[Unit]
def setLatencyMillis(latencyMillis: Long): F[Unit]

def report: Stream[F, Nothing]
}
Expand All @@ -28,17 +29,19 @@ object Metrics {

private case class State(
good: Int,
bad: Int
bad: Int,
latencyMillis: Long
) extends CommonMetrics.State {
def toKVMetrics: List[CommonMetrics.KVMetric] =
List(
KVMetric.CountGood(good),
KVMetric.CountBad(bad)
KVMetric.CountBad(bad),
KVMetric.LatencyMillis(latencyMillis)
)
}

private object State {
def empty: State = State(0, 0)
def empty: State = State(0, 0, 0L)
}

private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] =
Expand All @@ -47,6 +50,8 @@ object Metrics {
ref.update(s => s.copy(good = s.good + count))
def addBad(count: Int): F[Unit] =
ref.update(s => s.copy(bad = s.bad + count))
def setLatencyMillis(latencyMillis: Long): F[Unit] =
ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis)))
}

private object KVMetric {
Expand All @@ -63,5 +68,11 @@ object Metrics {
val metricType = CommonMetrics.MetricType.Count
}

final case class LatencyMillis(v: Long) extends CommonMetrics.KVMetric {
val key = "latency_millis"
val value = v.toString
val metricType = CommonMetrics.MetricType.Gauge
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.snowflake.{Config, Environment}
import com.snowplowanalytics.snowplow.snowflake.{Config, Environment, Metrics}
import com.snowplowanalytics.snowplow.loaders.transform.Transform

object Processing {
Expand Down Expand Up @@ -107,7 +107,8 @@ object Processing {
): EventProcessor[F] = { in =>
val badProcessor = BadRowProcessor(env.appInfo.name, env.appInfo.version)

in.through(parseBytes(badProcessor))
in.through(setLatency(env.metrics))
.through(parseBytes(badProcessor))
.through(transform(badProcessor))
.through(batchUp(env.batching))
.through(writeToSnowflake(env, badProcessor))
Expand All @@ -116,6 +117,20 @@ object Processing {
.through(emitTokens)
}

private def setLatency[F[_]: Sync](metrics: Metrics[F]): Pipe[F, TokenedEvents, TokenedEvents] =
_.evalTap {
_.earliestSourceTstamp match {
case Some(t) =>
for {
now <- Sync[F].realTime
latencyMillis = now.toMillis - t.toEpochMilli
_ <- metrics.setLatencyMillis(latencyMillis)
} yield ()
case None =>
Applicative[F].unit
}
}

/** Parse raw bytes into Event using analytics sdk */
private def parseBytes[F[_]: Monad](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] =
_.evalMap { case TokenedEvents(list, token, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object MockEnvironment {
case class WroteRowsToSnowflake(rowCount: Int) extends Action
case class AddedGoodCountMetric(count: Int) extends Action
case class AddedBadCountMetric(count: Int) extends Action
case class SetLatencyMetric(millis: Long) extends Action
}
import Action._

Expand Down Expand Up @@ -151,6 +152,9 @@ object MockEnvironment {
def addGood(count: Int): IO[Unit] =
ref.update(_ :+ AddedGoodCountMetric(count))

def setLatencyMillis(latencyMillis: Long): IO[Unit] =
ref.update(_ :+ SetLatencyMetric(latencyMillis))

def report: Stream[IO, Nothing] = Stream.never[IO]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import cats.effect.IO
import fs2.Stream
import org.specs2.Specification
import cats.effect.testing.specs2.CatsEffect
import cats.effect.testkit.TestControl
import net.snowflake.ingest.utils.{ErrorCode, SFException}

import java.nio.charset.StandardCharsets
import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.snowflake.MockEnvironment
Expand All @@ -33,6 +36,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Emit BadRows when the ChannelProvider reports a problem with the data $e5
Abort processing and don't ack events when the ChannelProvider reports a runtime error $e6
Reset the Channel when the ChannelProvider reports the channel has become invalid $e7
Set the latency metric based off the message timestamp $e8
"""

def e1 =
Expand Down Expand Up @@ -185,6 +189,35 @@ class ProcessingSpec extends Specification with CatsEffect {
)
}

def e8 = {
val messageTime = Instant.parse("2023-10-24T10:00:00.000Z")
val processTime = Instant.parse("2023-10-24T10:00:42.123Z")

val io = for {
inputs <- generateEvents.take(2).compile.toList.map {
_.map {
_.copy(earliestSourceTstamp = Some(messageTime))
}
}
control <- MockEnvironment.build(inputs)
_ <- IO.sleep(processTime.toEpochMilli.millis)
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.SetLatencyMetric(42123),
Action.SetLatencyMetric(42123),
Action.WroteRowsToSnowflake(4),
Action.AddedGoodCountMetric(4),
Action.AddedBadCountMetric(0),
Action.Checkpointed(List(inputs(0).ack, inputs(1).ack))
)
)

TestControl.executeEmbed(io)

}

}

object ProcessingSpec {
Expand Down
9 changes: 6 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ object Dependencies {

object V {
// Scala
val catsEffect = "3.5.0"
val catsRetry = "3.1.0"
val http4s = "0.23.15"
val decline = "2.4.1"
Expand All @@ -27,7 +28,7 @@ object Dependencies {
val awsSdk2 = "2.20.135"

// Snowplow
val streams = "0.1.0-M6"
val streams = "0.1.0-M7"

// tests
val specs2 = "4.20.0"
Expand Down Expand Up @@ -58,8 +59,9 @@ object Dependencies {
val runtime = "com.snowplowanalytics" %% "runtime-common" % V.streams

// tests
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val catsEffectSpecs2 = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsEffectSpecs2 % Test
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val catsEffectSpecs2 = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsEffectSpecs2 % Test
val catsEffectTestkit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffect % Test

val coreDependencies = Seq(
streamsCore,
Expand All @@ -74,6 +76,7 @@ object Dependencies {
circeGenericExtra,
specs2,
catsEffectSpecs2,
catsEffectTestkit,
slf4j % Test
)

Expand Down

0 comments on commit 798b621

Please sign in to comment.