diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index afcd8ab..7a89361 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -14,7 +14,7 @@ import fs2.Stream import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager} +import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, SnowflakeHealth, TableManager} import com.snowplowanalytics.snowplow.runtime.AppInfo import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -32,9 +32,20 @@ object MockEnvironment { case object ClosedChannel extends Action case object OpenedChannel extends Action case class WroteRowsToSnowflake(rowCount: Int) extends Action + + /** Metrics * */ case class AddedGoodCountMetric(count: Int) extends Action case class AddedBadCountMetric(count: Int) extends Action case class SetLatencyMetric(millis: Long) extends Action + + /** Alerts * */ + case object SentFailedToCreateEventsTableAlert extends Action + case object SentFailedToAddColumnsAlert extends Action + case object SentFailedToOpenSnowflakeChannelAlert extends Action + + /** Health probe * */ + case object BecomeHealthy extends Action + case object BecomeUnhealthy extends Action } import Action._ @@ -51,16 +62,21 @@ object MockEnvironment { def build(inputs: List[TokenedEvents], channelResponses: List[Channel.WriteResult] = Nil): IO[MockEnvironment] = for { state <- Ref[IO].of(Vector.empty[Action]) - channel <- testChannel(state, channelResponses) + channelProvider <- testChannelProvider(state, channelResponses) } yield { val env = Environment( - appInfo = appInfo, - source = testSourceAndAck(inputs, state), - badSink = testSink(state), - httpClient = testHttpClient, - tableManager = testTableManager(state), - channel = channel, - metrics = testMetrics(state), + appInfo = appInfo, + source = testSourceAndAck(inputs, state), + badSink = testSink(state), + httpClient = testHttpClient, + tableManager = testTableManager(state), + channelProvider = channelProvider, + metrics = testMetrics(state), + monitoring = testMonitoring(state), + snowflakeHealth = testSnowflakeHealth(state), + retries = Config.Retries( + backoff = 30.seconds + ), batching = Config.Batching( maxBytes = 16000000, maxDelay = 10.seconds, @@ -112,7 +128,7 @@ object MockEnvironment { } /** - * Mocked implementation of a `Channel` + * Mocked implementation of a `Channel.Provider` * * @param actionRef * Global Ref used to accumulate actions that happened @@ -120,32 +136,35 @@ object MockEnvironment { * Responses that this mocked Channel should return each time someone calls `write`. If no * responses given, then it will return with a successful response. */ - private def testChannel( + private def testChannelProvider( actionRef: Ref[IO, Vector[Action]], responses: List[Channel.WriteResult] - ): IO[Resource[IO, Channel[IO]]] = + ): IO[Channel.Provider[IO]] = for { responseRef <- Ref[IO].of(responses) - } yield { - val make = actionRef.update(_ :+ OpenedChannel).as { - new Channel[IO] { - def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] = - for { - response <- responseRef.modify { - case head :: tail => (tail, head) - case Nil => (Nil, Channel.WriteResult.WriteFailures(Nil)) - } - _ <- response match { - case Channel.WriteResult.WriteFailures(failures) => - actionRef.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size)) - case Channel.WriteResult.ChannelIsInvalid => - IO.unit - } - } yield response - } - } - - Resource.make(make)(_ => actionRef.update(_ :+ ClosedChannel)) + } yield new Channel.Provider[IO] { + def open: IO[Channel[IO]] = + actionRef.update(_ :+ OpenedChannel).as(testChannel(actionRef, responseRef)) + } + + private def testChannel(actionRef: Ref[IO, Vector[Action]], responseRef: Ref[IO, List[Channel.WriteResult]]): Channel[IO] = + new Channel[IO] { + def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] = + for { + response <- responseRef.modify { + case head :: tail => (tail, head) + case Nil => (Nil, Channel.WriteResult.WriteFailures(Nil)) + } + _ <- response match { + case Channel.WriteResult.WriteFailures(failures) => + actionRef.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size)) + case Channel.WriteResult.ChannelIsInvalid => + IO.unit + } + } yield response + + def close: IO[Unit] = + actionRef.update(_ :+ ClosedChannel) } def testMetrics(ref: Ref[IO, Vector[Action]]): Metrics[IO] = new Metrics[IO] { @@ -160,4 +179,19 @@ object MockEnvironment { def report: Stream[IO, Nothing] = Stream.never[IO] } + + def testMonitoring(ref: Ref[IO, Vector[Action]]): Monitoring[IO] = new Monitoring[IO] { + def alert(message: Alert): IO[Unit] = message match { + case Alert.FailedToCreateEventsTable(_) => ref.update(_ :+ SentFailedToCreateEventsTableAlert) + case Alert.FailedToAddColumns(_, _) => ref.update(_ :+ SentFailedToAddColumnsAlert) + case Alert.FailedToOpenSnowflakeChannel(_) => ref.update(_ :+ SentFailedToOpenSnowflakeChannelAlert) + } + } + + def testSnowflakeHealth(ref: Ref[IO, Vector[Action]]): SnowflakeHealth[IO] = new SnowflakeHealth[IO] { + def setHealthy(): IO[Unit] = + ref.update(_ :+ BecomeHealthy) + def setUnhealthy(): IO[Unit] = + ref.update(_ :+ BecomeUnhealthy) + } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index 7394c10..e7980d1 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -49,6 +49,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.InitEventsTable, Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(4), Action.AddedGoodCountMetric(4), Action.AddedBadCountMetric(0), @@ -87,6 +88,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.InitEventsTable, Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(6), Action.SentToBad(6), Action.AddedGoodCountMetric(6), @@ -115,10 +117,12 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.InitEventsTable, Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(1), Action.ClosedChannel, Action.AlterTableAddedColumns(List("unstruct_event_xyz_1", "contexts_abc_2")), Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(1), Action.AddedGoodCountMetric(2), Action.AddedBadCountMetric(0), @@ -147,6 +151,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.InitEventsTable, Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(1), Action.SentToBad(1), Action.AddedGoodCountMetric(1), @@ -176,6 +181,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.InitEventsTable, Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(1), Action.ClosedChannel ) @@ -197,8 +203,10 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.InitEventsTable, Action.OpenedChannel, + Action.BecomeHealthy, Action.ClosedChannel, Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(2), Action.AddedGoodCountMetric(2), Action.AddedBadCountMetric(0), @@ -228,6 +236,7 @@ class ProcessingSpec extends Specification with CatsEffect { Action.SetLatencyMetric(42123), Action.SetLatencyMetric(42123), Action.OpenedChannel, + Action.BecomeHealthy, Action.WroteRowsToSnowflake(4), Action.AddedGoodCountMetric(4), Action.AddedBadCountMetric(0),