From 17e42ca0e94f4b33e1241f09361329c59aedd7a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Thu, 4 Jan 2024 10:53:04 +0100 Subject: [PATCH] Extend app health with bad sink --- .../AppHealth.scala | 69 ++++-- .../Environment.scala | 18 +- .../processing/Channel.scala | 6 +- .../processing/Processing.scala | 49 +++-- .../processing/SnowflakeHealth.scala | 20 -- .../processing/SnowflakeRetrying.scala | 12 +- .../processing/TableManager.scala | 8 +- .../MockEnvironment.scala | 109 ++++++---- .../processing/ChannelProviderSpec.scala | 50 +++-- .../processing/ProcessingSpec.scala | 204 ++++++++++++------ 10 files changed, 340 insertions(+), 205 deletions(-) delete mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala index 86d8a0b..6a473c9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala @@ -1,26 +1,41 @@ package com.snowplowanalytics.snowplow.snowflake +import cats.effect.{Concurrent, Ref} import cats.implicits._ -import cats.{Functor, Monad, Monoid} +import cats.{Monad, Monoid, Show} import com.snowplowanalytics.snowplow.runtime.HealthProbe import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy} -import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth import com.snowplowanalytics.snowplow.sources.SourceAndAck -object AppHealth { +import scala.concurrent.duration.FiniteDuration - def isHealthy[F[_]: Monad]( - config: Config.HealthProbe, - source: SourceAndAck[F], - snowflakeHealth: SnowflakeHealth[F] - ): F[HealthProbe.Status] = - List( - sourceIsHealthy(config, source), - snowflakeHealth.state.get - ).foldA - - private def sourceIsHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = - source.isHealthy(config.unhealthyLatency).map { +final class AppHealth[F[_]: Monad]( + unhealthyLatency: FiniteDuration, + source: SourceAndAck[F], + appManagedServices: Ref[F, Map[AppHealth.Service, Boolean]] +) { + + def status(): F[HealthProbe.Status] = + for { + sourceHealth <- getSourceHealth + servicesHealth <- getAppManagedServicesHealth + } yield (sourceHealth :: servicesHealth).combineAll + + def setServiceHealth(service: AppHealth.Service, isHealthy: Boolean): F[Unit] = + appManagedServices.update { currentHealth => + currentHealth.updated(service, isHealthy) + } + + private def getAppManagedServicesHealth: F[List[HealthProbe.Status]] = + appManagedServices.get.map { services => + services.map { + case (service, false) => HealthProbe.Unhealthy(show"$service is not healthy") + case _ => HealthProbe.Healthy + }.toList + } + + private def getSourceHealth: F[HealthProbe.Status] = + source.isHealthy(unhealthyLatency).map { case SourceAndAck.Healthy => HealthProbe.Healthy case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show) } @@ -34,3 +49,27 @@ object AppHealth { private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth) } + +object AppHealth { + + sealed trait Service + + object Service { + case object Snowflake extends Service + case object BadSink extends Service + + implicit val show: Show[Service] = Show.show { + case Snowflake => "Snowflake" + case BadSink => "Bad sink" + } + } + + def init[F[_]: Concurrent]( + unhealthyLatency: FiniteDuration, + source: SourceAndAck[F], + initialHealth: Map[AppHealth.Service, Boolean] + ): F[AppHealth[F]] = + Ref + .of[F, Map[AppHealth.Service, Boolean]](initialHealth) + .map(appManaged => new AppHealth[F](unhealthyLatency, source, appManaged)) +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index ff94041..d13879f 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -12,7 +12,8 @@ import cats.effect.{Async, Resource} import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, SnowflakeHealth, TableManager} +import com.snowplowanalytics.snowplow.snowflake.AppHealth.Service +import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager} import com.snowplowanalytics.snowplow.sources.SourceAndAck import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client @@ -25,6 +26,7 @@ case class Environment[F[_]]( tableManager: TableManager[F], channel: Channel.Provider[F], metrics: Metrics[F], + appHealth: AppHealth[F], batching: Config.Batching, schemasToSkip: List[SchemaCriterion], badRowMaxSize: Int @@ -32,6 +34,11 @@ case class Environment[F[_]]( object Environment { + private val initialAppHealth: Map[Service, Boolean] = Map( + Service.Snowflake -> false, + Service.BadSink -> true + ) + def fromConfig[F[_]: Async, SourceConfig, SinkConfig]( config: Config[SourceConfig, SinkConfig], appInfo: AppInfo, @@ -40,19 +47,19 @@ object Environment { ): Resource[F, Environment[F]] = for { _ <- Sentry.capturingAnyException(appInfo, config.monitoring.sentry) - snowflakeHealth <- Resource.eval(SnowflakeHealth.initUnhealthy[F]) sourceAndAck <- Resource.eval(toSource(config.input)) + appHealth <- Resource.eval(AppHealth.init(config.monitoring.healthProbe.unhealthyLatency, sourceAndAck, initialAppHealth)) _ <- HealthProbe.resource( config.monitoring.healthProbe.port, - AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth) + appHealth.status() ) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient) badSink <- toSink(config.output.bad.sink) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) - tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring)) + tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries, monitoring)) channelOpener <- Channel.opener(config.output.good, config.batching) - channelProvider <- Channel.provider(channelOpener, config.retries, snowflakeHealth, monitoring) + channelProvider <- Channel.provider(channelOpener, config.retries, appHealth, monitoring) } yield Environment( appInfo = appInfo, source = sourceAndAck, @@ -61,6 +68,7 @@ object Environment { tableManager = tableManager, channel = channelProvider, metrics = metrics, + appHealth = appHealth, batching = config.batching, schemasToSkip = config.skipSchemas, badRowMaxSize = config.output.bad.maxRecordSize diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala index f15b7a1..c8d16a0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala @@ -9,7 +9,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.{Async, Poll, Resource, Sync} import cats.implicits._ -import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, Monitoring} +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring} import net.snowflake.ingest.streaming.internal.SnowsFlakePlowInterop import net.snowflake.ingest.streaming._ import net.snowflake.ingest.utils.{ErrorCode => SFErrorCode, ParameterProvider, SFException} @@ -105,7 +105,7 @@ object Channel { def provider[F[_]: Async]( opener: Opener[F], retries: Config.Retries, - health: SnowflakeHealth[F], + health: AppHealth[F], monitoring: Monitoring[F] ): Resource[F, Provider[F]] = Coldswap.make(openerToResource(opener, retries, health, monitoring)) @@ -113,7 +113,7 @@ object Channel { private def openerToResource[F[_]: Async]( opener: Opener[F], retries: Config.Retries, - health: SnowflakeHealth[F], + health: AppHealth[F], monitoring: Monitoring[F] ): Resource[F, Channel[F]] = { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index 496716b..560970e 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Pr import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.ListOfList -import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics} +import com.snowplowanalytics.snowplow.snowflake.{AppHealth, Environment, Metrics} import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ import com.snowplowanalytics.snowplow.runtime.processing.BatchUp import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, Transform} @@ -216,23 +216,30 @@ object Processing { batch: BatchAfterTransform )( handleFailures: List[Channel.WriteFailure] => F[BatchAfterTransform] - ): F[BatchAfterTransform] = - if (batch.toBeInserted.isEmpty) - batch.pure[F] - else - Sync[F].untilDefinedM { - env.channel.opened - .use { channel => - channel.write(batch.toBeInserted.asIterable.map(_._2)) - } - .flatMap { - case Channel.WriteResult.ChannelIsInvalid => - // Reset the channel and immediately try again - env.channel.closed.use_.as(none) - case Channel.WriteResult.WriteFailures(notWritten) => - handleFailures(notWritten).map(Some(_)) - } + ): F[BatchAfterTransform] = { + val attempt: F[BatchAfterTransform] = + if (batch.toBeInserted.isEmpty) + batch.pure[F] + else + Sync[F].untilDefinedM { + env.channel.opened + .use { channel => + channel.write(batch.toBeInserted.asIterable.map(_._2)) + } + .flatMap { + case Channel.WriteResult.ChannelIsInvalid => + // Reset the channel and immediately try again + env.channel.closed.use_.as(none) + case Channel.WriteResult.WriteFailures(notWritten) => + handleFailures(notWritten).map(Some(_)) + } + } + + attempt + .onError { _ => + env.appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false) } + } /** * First attempt to write events with the Snowflake SDK @@ -344,7 +351,7 @@ object Processing { env.tableManager.addColumns(extraColsRequired.toList) } - private def sendFailedEvents[F[_]: Applicative]( + private def sendFailedEvents[F[_]: Sync]( env: Environment[F], badRowProcessor: BadRowProcessor ): Pipe[F, BatchAfterTransform, BatchAfterTransform] = @@ -352,7 +359,11 @@ object Processing { if (batch.badAccumulated.nonEmpty) { val serialized = batch.badAccumulated.mapUnordered(badRow => BadRowsSerializer.withMaxSize(badRow, badRowProcessor, env.badRowMaxSize)) - env.badSink.sinkSimple(serialized) + env.badSink + .sinkSimple(serialized) + .onError { _ => + env.appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = false) + } } else Applicative[F].unit } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala deleted file mode 100644 index 0a54628..0000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.snowplowanalytics.snowplow.snowflake.processing - -import cats.effect.{Concurrent, Ref} -import cats.implicits._ -import com.snowplowanalytics.snowplow.runtime.HealthProbe -import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth.unhealthy - -final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) { - def setUnhealthy(): F[Unit] = state.set(unhealthy) - def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy) -} - -object SnowflakeHealth { - private val unhealthy = HealthProbe.Unhealthy("Snowflake connection is not healthy") - - def initUnhealthy[F[_]: Concurrent]: F[SnowflakeHealth[F]] = - Ref - .of[F, HealthProbe.Status](unhealthy) - .map(SnowflakeHealth.apply) -} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala index 5937bb5..7f33760 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala @@ -2,7 +2,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.Sync import cats.implicits._ -import com.snowplowanalytics.snowplow.snowflake.Config +import com.snowplowanalytics.snowplow.snowflake.{AppHealth, Config} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry._ @@ -12,17 +12,17 @@ object SnowflakeRetrying { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], config: Config.Retries)(action: F[A]): F[A] = - retryUntilSuccessful(snowflakeHealth, config, action) <* - snowflakeHealth.setHealthy() + def retryIndefinitely[F[_]: Sync: Sleep, A](appHealth: AppHealth[F], config: Config.Retries)(action: F[A]): F[A] = + retryUntilSuccessful(appHealth, config, action) <* + appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = true) private def retryUntilSuccessful[F[_]: Sync: Sleep, A]( - snowflakeHealth: SnowflakeHealth[F], + appHealth: AppHealth[F], config: Config.Retries, action: F[A] ): F[A] = action - .onError(_ => snowflakeHealth.setUnhealthy()) + .onError(_ => appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false)) .retryingOnAllErrors( policy = RetryPolicies.exponentialBackoff[F](config.backoff), onError = (error, details) => Logger[F].error(error)(s"Executing Snowflake command failed. ${extractRetryDetails(details)}") diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index d2123a9..3a2d361 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -9,7 +9,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.{Async, Sync} import cats.implicits._ -import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, JdbcTransactor, Monitoring} +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, JdbcTransactor, Monitoring} import doobie.implicits._ import doobie.{ConnectionIO, Fragment} import net.snowflake.client.jdbc.SnowflakeSQLException @@ -32,7 +32,7 @@ object TableManager { def make[F[_]: Async]( config: Config.Snowflake, - snowflakeHealth: SnowflakeHealth[F], + appHealth: AppHealth[F], retriesConfig: Config.Retries, monitoring: Monitoring[F] ): F[TableManager[F]] = @@ -40,7 +40,7 @@ object TableManager { new TableManager[F] { override def initializeEventsTable(): F[Unit] = - SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + SnowflakeRetrying.retryIndefinitely(appHealth, retriesConfig) { Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *> executeInitTableQuery() .onError { cause => @@ -49,7 +49,7 @@ object TableManager { } override def addColumns(columns: List[String]): F[Unit] = - SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + SnowflakeRetrying.retryIndefinitely(appHealth, retriesConfig) { Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> executeAddColumnsQuery(columns) .onError { cause => diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index b7cd2ca..1788c58 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -9,13 +9,13 @@ package com.snowplowanalytics.snowplow.snowflake import cats.effect.IO import cats.effect.kernel.{Ref, Resource, Unique} -import org.http4s.client.Client -import fs2.Stream - -import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} +import com.snowplowanalytics.snowplow.runtime.AppInfo import com.snowplowanalytics.snowplow.sinks.Sink +import com.snowplowanalytics.snowplow.snowflake.AppHealth.Service.{BadSink, Snowflake} import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, Coldswap, TableManager} -import com.snowplowanalytics.snowplow.runtime.AppInfo +import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} +import fs2.Stream +import org.http4s.client.Client import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -23,6 +23,8 @@ case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], envir object MockEnvironment { + private val everythingHealthy: Map[AppHealth.Service, Boolean] = Map(Snowflake -> true, BadSink -> true) + sealed trait Action object Action { case object InitEventsTable extends Action @@ -38,30 +40,23 @@ object MockEnvironment { } import Action._ - /** - * Build a mock environment for testing - * - * @param inputs - * Input events to send into the environment. - * @param channelResponses - * Responses we want the `Channel` to return when someone calls `write` - * @return - * An environment and a Ref that records the actions make by the environment - */ - def build(inputs: List[TokenedEvents], channelResponses: List[Channel.WriteResult]): Resource[IO, MockEnvironment] = + def build(inputs: List[TokenedEvents], mocks: Mocks): Resource[IO, MockEnvironment] = for { state <- Resource.eval(Ref[IO].of(Vector.empty[Action])) - channelResource <- Resource.eval(testChannel(state, channelResponses)) + source = testSourceAndAck(inputs, state) + channelResource <- Resource.eval(testChannel(mocks.channelResponses, state)) channelColdswap <- Coldswap.make(channelResource) + appHealth <- Resource.eval(AppHealth.init(10.seconds, source, everythingHealthy)) } yield { val env = Environment( appInfo = appInfo, - source = testSourceAndAck(inputs, state), - badSink = testSink(state), + source = source, + badSink = testBadSink(mocks.badSinkResponse, state), httpClient = testHttpClient, tableManager = testTableManager(state), channel = channelColdswap, metrics = testMetrics(state), + appHealth = appHealth, batching = Config.Batching( maxBytes = 16000000, maxDelay = 10.seconds, @@ -73,6 +68,21 @@ object MockEnvironment { MockEnvironment(state, env) } + final case class Mocks( + channelResponses: List[Response[Channel.WriteResult]], + badSinkResponse: Response[Unit] + ) + + object Mocks { + val default: Mocks = Mocks(channelResponses = List.empty, badSinkResponse = Response.Success(())) + } + + sealed trait Response[+A] + object Response { + final case class Success[A](value: A) extends Response[A] + final case class ExceptionThrown(value: Throwable) extends Response[Nothing] + } + val appInfo = new AppInfo { def name = "snowflake-loader-test" def version = "0.0.0" @@ -105,48 +115,55 @@ object MockEnvironment { IO.pure(SourceAndAck.Healthy) } - private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch => - ref.update(_ :+ SentToBad(batch.asIterable.size)) - } + private def testBadSink(mockedResponse: Response[Unit], state: Ref[IO, Vector[Action]]): Sink[IO] = + Sink[IO] { batch => + mockedResponse match { + case Response.Success(_) => + state.update(_ :+ SentToBad(batch.asIterable.size)) + case Response.ExceptionThrown(value) => + IO.raiseError(value) + } + } private def testHttpClient: Client[IO] = Client[IO] { _ => Resource.raiseError[IO, Nothing, Throwable](new RuntimeException("http failure")) } - /** - * Mocked implementation of a `Channel` - * - * @param actionRef - * Global Ref used to accumulate actions that happened - * @param responses - * Responses that this mocked Channel should return each time someone calls `write`. If no - * responses given, then it will return with a successful response. - */ private def testChannel( - actionRef: Ref[IO, Vector[Action]], - responses: List[Channel.WriteResult] + mockedResponses: List[Response[Channel.WriteResult]], + actionRef: Ref[IO, Vector[Action]] ): IO[Resource[IO, Channel[IO]]] = - for { - responseRef <- Ref[IO].of(responses) - } yield { + Ref[IO].of(mockedResponses).map { responses => val make = actionRef.update(_ :+ OpenedChannel).as { new Channel[IO] { def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] = for { - response <- responseRef.modify { + response <- responses.modify { case head :: tail => (tail, head) - case Nil => (Nil, Channel.WriteResult.WriteFailures(Nil)) + case Nil => (Nil, Response.Success(Channel.WriteResult.WriteFailures(Nil))) } - _ <- response match { - case Channel.WriteResult.WriteFailures(failures) => - actionRef.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size)) - case Channel.WriteResult.ChannelIsInvalid => - IO.unit - } - } yield response + writeResult <- response match { + case success: Response.Success[Channel.WriteResult] => + updateActions(actionRef, rows, success) *> IO(success.value) + case Response.ExceptionThrown(ex) => + IO.raiseError(ex) + } + } yield writeResult + + def updateActions( + state: Ref[IO, Vector[Action]], + rows: Iterable[Map[String, AnyRef]], + success: Response.Success[Channel.WriteResult] + ): IO[Unit] = + success.value match { + case Channel.WriteResult.WriteFailures(failures) => + state.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size)) + case Channel.WriteResult.ChannelIsInvalid => + IO.unit + } + } } - Resource.make(make)(_ => actionRef.update(_ :+ ClosedChannel)) } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala index cd6a28f..63d3469 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala @@ -14,10 +14,11 @@ import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl -import scala.concurrent.duration.DurationLong - -import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, Monitoring} +import scala.concurrent.duration.{DurationLong, FiniteDuration} +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring} import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.snowflake.AppHealth.Service.{BadSink, Snowflake} +import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck} class ChannelProviderSpec extends Specification with CatsEffect { import ChannelProviderSpec._ @@ -33,12 +34,12 @@ class ChannelProviderSpec extends Specification with CatsEffect { """ def e1 = control.flatMap { c => - val io = Channel.provider(c.channelOpener, retriesConfig, c.snowflakeHealth, c.monitoring).use_ + val io = Channel.provider(c.channelOpener, retriesConfig, c.appHealth, c.monitoring).use_ for { _ <- io state <- c.state.get - health <- c.snowflakeHealth.state.get + health <- c.appHealth.status() } yield List( state should beEqualTo(Vector()), health should beHealthy @@ -46,7 +47,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { } def e2 = control.flatMap { c => - val io = Channel.provider(c.channelOpener, retriesConfig, c.snowflakeHealth, c.monitoring).use { provider => + val io = Channel.provider(c.channelOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => provider.opened.use_ } @@ -58,7 +59,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { for { _ <- io state <- c.state.get - health <- c.snowflakeHealth.state.get + health <- c.appHealth.status() } yield List( state should beEqualTo(expectedState), health should beHealthy @@ -66,7 +67,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { } def e3 = control.flatMap { c => - val io = Channel.provider(c.channelOpener, retriesConfig, c.snowflakeHealth, c.monitoring).use { provider => + val io = Channel.provider(c.channelOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => provider.opened.use { _ => goBOOM } @@ -80,7 +81,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { for { _ <- io.voidError state <- c.state.get - health <- c.snowflakeHealth.state.get + health <- c.appHealth.status() } yield List( state should beEqualTo(expectedState), health should beHealthy @@ -93,7 +94,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { def open: IO[Channel.CloseableChannel[IO]] = goBOOM } - val io = Channel.provider(throwingOpener, retriesConfig, c.snowflakeHealth, c.monitoring).use { provider => + val io = Channel.provider(throwingOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => provider.opened.use_ } @@ -109,7 +110,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { _ <- IO.sleep(4.minutes) _ <- fiber.cancel state <- c.state.get - health <- c.snowflakeHealth.state.get + health <- c.appHealth.status() } yield List( state should beEqualTo(expectedState), health should beUnhealthy @@ -125,7 +126,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { } // Three concurrent fibers wanting to open the channel: - val io = Channel.provider(throwingOpener, retriesConfig, c.snowflakeHealth, c.monitoring).use { provider => + val io = Channel.provider(throwingOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => Supervisor[IO](await = false).use { supervisor => supervisor.supervise(provider.opened.surround(IO.never)) *> supervisor.supervise(provider.opened.surround(IO.never)) *> @@ -145,7 +146,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { fiber <- io.start _ <- IO.sleep(4.minutes) state <- c.state.get - health <- c.snowflakeHealth.state.get + health <- c.appHealth.status() _ <- fiber.cancel } yield List( state should beEqualTo(expectedState), @@ -169,7 +170,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { } val io = throwingOnceOpener.flatMap { channelOpener => - Channel.provider(channelOpener, retriesConfig, c.snowflakeHealth, c.monitoring).use { provider => + Channel.provider(channelOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => provider.opened.use_ } } @@ -183,7 +184,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { val test = for { _ <- io state <- c.state.get - health <- c.snowflakeHealth.state.get + health <- c.appHealth.status() } yield List( state should beEqualTo(expectedState), health should beHealthy @@ -223,7 +224,7 @@ object ChannelProviderSpec { case class Control( state: Ref[IO, Vector[Action]], channelOpener: Channel.Opener[IO], - snowflakeHealth: SnowflakeHealth[IO], + appHealth: AppHealth[IO], monitoring: Monitoring[IO] ) @@ -232,9 +233,20 @@ object ChannelProviderSpec { def control: IO[Control] = for { state <- Ref[IO].of(Vector.empty[Action]) - snowflakeHealth <- SnowflakeHealth.initUnhealthy[IO] - _ <- snowflakeHealth.setHealthy() // Simulate the health state after the table has been created - } yield Control(state, testChannelOpener(state), snowflakeHealth, testMonitoring(state)) + appHealth <- testAppHealth() + } yield Control(state, testChannelOpener(state), appHealth, testMonitoring(state)) + + private def testAppHealth(): IO[AppHealth[IO]] = { + val everythingHealthy: Map[AppHealth.Service, Boolean] = Map(Snowflake -> true, BadSink -> true) + val healthySource = new SourceAndAck[IO] { + override def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): fs2.Stream[IO, Nothing] = + fs2.Stream.empty + + override def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] = + IO(SourceAndAck.Healthy) + } + AppHealth.init(10.seconds, healthySource, everythingHealthy) + } private def testChannelOpener(state: Ref[IO, Vector[Action]]): Channel.Opener[IO] = new Channel.Opener[IO] { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index cb2da56..7f0cdb6 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -8,22 +8,22 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.IO -import fs2.{Chunk, Stream} -import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.snowflake.MockEnvironment +import com.snowplowanalytics.snowplow.snowflake.MockEnvironment.{Action, Mocks, Response} +import com.snowplowanalytics.snowplow.sources.TokenedEvents +import fs2.{Chunk, Stream} import net.snowflake.ingest.utils.{ErrorCode, SFException} +import org.specs2.Specification -import java.nio.charset.StandardCharsets import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.time.Instant import scala.concurrent.duration.DurationLong -import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.snowflake.MockEnvironment -import com.snowplowanalytics.snowplow.snowflake.MockEnvironment.Action -import com.snowplowanalytics.snowplow.sources.TokenedEvents - class ProcessingSpec extends Specification with CatsEffect { import ProcessingSpec._ @@ -37,10 +37,13 @@ class ProcessingSpec extends Specification with CatsEffect { Abort processing and don't ack events when the Channel reports a runtime error $e6 Reset the Channel when the Channel reports the channel has become invalid $e7 Set the latency metric based off the message timestamp $e8 + Mark app as unhealthy when sinking badrows fails $e9 + Mark app as unhealthy when writing to the Channel fails with runtime exception $e10 + Mark app as unhealthy when writing to the Channel fails SDK internal error exception $e11 """ def e1 = - setup(generateEvents.take(2).compile.toList) { case (inputs, control) => + runTest(inputEvents(count = 2, good)) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get @@ -57,7 +60,7 @@ class ProcessingSpec extends Specification with CatsEffect { } def e2 = - setup(generateBadlyFormatted.take(3).compile.toList) { case (inputs, control) => + runTest(inputEvents(count = 3, badlyFormatted)) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get @@ -74,12 +77,12 @@ class ProcessingSpec extends Specification with CatsEffect { def e3 = { val toInputs = for { - bads <- generateBadlyFormatted.take(3).compile.toList - goods <- generateEvents.take(3).compile.toList + bads <- inputEvents(count = 3, badlyFormatted) + goods <- inputEvents(count = 3, good) } yield bads.zip(goods).map { case (bad, good) => TokenedEvents(bad.events ++ good.events, good.ack, None) } - setup(toInputs) { case (inputs, control) => + runTest(toInputs) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get @@ -98,16 +101,20 @@ class ProcessingSpec extends Specification with CatsEffect { } def e4 = { - val mockedChannelResponses = List( - Channel.WriteResult.WriteFailures( - List( - Channel.WriteFailure(0L, List("unstruct_event_xyz_1", "contexts_abc_2"), new SFException(ErrorCode.INVALID_FORMAT_ROW)) - ) - ), - Channel.WriteResult.WriteFailures(Nil) + val mocks = Mocks.default.copy( + channelResponses = List( + Response.Success( + Channel.WriteResult.WriteFailures( + List( + Channel.WriteFailure(0L, List("unstruct_event_xyz_1", "contexts_abc_2"), new SFException(ErrorCode.INVALID_FORMAT_ROW)) + ) + ) + ), + Response.Success(Channel.WriteResult.WriteFailures(Nil)) + ) ) - setup(generateEvents.take(1).compile.toList, mockedChannelResponses) { case (inputs, control) => + runTest(inputEvents(count = 1, good), mocks) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get @@ -129,16 +136,20 @@ class ProcessingSpec extends Specification with CatsEffect { } def e5 = { - val mockedChannelResponses = List( - Channel.WriteResult.WriteFailures( - List( - Channel.WriteFailure(0L, Nil, new SFException(ErrorCode.INVALID_FORMAT_ROW)) - ) - ), - Channel.WriteResult.WriteFailures(Nil) + val mocks = Mocks.default.copy( + channelResponses = List( + Response.Success( + Channel.WriteResult.WriteFailures( + List( + Channel.WriteFailure(0L, Nil, new SFException(ErrorCode.INVALID_FORMAT_ROW)) + ) + ) + ), + Response.Success(Channel.WriteResult.WriteFailures(Nil)) + ) ) - setup(generateEvents.take(1).compile.toList, mockedChannelResponses) { case (inputs, control) => + runTest(inputEvents(count = 1, good), mocks) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get @@ -157,16 +168,20 @@ class ProcessingSpec extends Specification with CatsEffect { } def e6 = { - val mockedChannelResponses = List( - Channel.WriteResult.WriteFailures( - List( - Channel.WriteFailure(0L, Nil, new SFException(ErrorCode.INTERNAL_ERROR)) - ) - ), - Channel.WriteResult.WriteFailures(Nil) + val mocks = Mocks.default.copy( + channelResponses = List( + Response.Success( + Channel.WriteResult.WriteFailures( + List( + Channel.WriteFailure(0L, Nil, new SFException(ErrorCode.INTERNAL_ERROR)) + ) + ) + ), + Response.Success(Channel.WriteResult.WriteFailures(Nil)) + ) ) - setup(generateEvents.take(1).compile.toList, mockedChannelResponses) { case (_, control) => + runTest(inputEvents(count = 1, good), mocks) { case (_, control) => for { _ <- Processing.stream(control.environment).compile.drain.handleError(_ => ()) state <- control.state.get @@ -181,12 +196,14 @@ class ProcessingSpec extends Specification with CatsEffect { } def e7 = { - val mockedChannelResponses = List( - Channel.WriteResult.ChannelIsInvalid, - Channel.WriteResult.WriteFailures(Nil) + val mocks = Mocks.default.copy( + channelResponses = List( + Response.Success(Channel.WriteResult.ChannelIsInvalid), + Response.Success(Channel.WriteResult.WriteFailures(Nil)) + ) ) - setup(generateEvents.take(1).compile.toList, mockedChannelResponses) { case (inputs, control) => + runTest(inputEvents(count = 1, good), mocks) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get @@ -209,13 +226,13 @@ class ProcessingSpec extends Specification with CatsEffect { val messageTime = Instant.parse("2023-10-24T10:00:00.000Z") val processTime = Instant.parse("2023-10-24T10:00:42.123Z") - val toInputs = generateEvents.take(2).compile.toList.map { + val toInputs = inputEvents(count = 2, good).map { _.map { _.copy(earliestSourceTstamp = Some(messageTime)) } } - val io = setup(toInputs) { case (inputs, control) => + val io = runTest(toInputs) { case (inputs, control) => for { _ <- IO.sleep(processTime.toEpochMilli.millis) _ <- Processing.stream(control.environment).compile.drain @@ -238,45 +255,96 @@ class ProcessingSpec extends Specification with CatsEffect { } + def e9 = { + val mocks = Mocks.default.copy( + badSinkResponse = Response.ExceptionThrown(new RuntimeException("Some error when sinking bad data")) + ) + + runTest(inputEvents(count = 1, badlyFormatted), mocks) { case (_, control) => + for { + _ <- Processing.stream(control.environment).compile.drain.voidError + healthStatus <- control.environment.appHealth.status() + } yield healthStatus should beEqualTo(HealthProbe.Unhealthy("Bad sink is not healthy")) + } + } + + def e10 = { + val mocks = Mocks.default.copy( + channelResponses = List(Response.ExceptionThrown(new RuntimeException("Some error when writing to the Channel"))) + ) + + runTest(inputEvents(count = 1, good), mocks) { case (_, control) => + for { + _ <- Processing.stream(control.environment).compile.drain.voidError + healthStatus <- control.environment.appHealth.status() + } yield healthStatus should beEqualTo(HealthProbe.Unhealthy("Snowflake is not healthy")) + } + } + + def e11 = { + val mocks = Mocks.default.copy( + channelResponses = List( + Response.Success( + Channel.WriteResult.WriteFailures( + List( + Channel.WriteFailure(0L, List.empty, new SFException(ErrorCode.INTERNAL_ERROR)) + ) + ) + ) + ) + ) + + runTest(inputEvents(count = 1, good), mocks) { case (_, control) => + for { + _ <- Processing.stream(control.environment).compile.drain.voidError + healthStatus <- control.environment.appHealth.status() + } yield healthStatus should beEqualTo(HealthProbe.Unhealthy("Snowflake is not healthy")) + } + } + } object ProcessingSpec { - def setup[A]( + def runTest[A]( toInputs: IO[List[TokenedEvents]], - channelResponses: List[Channel.WriteResult] = Nil + mocks: Mocks = Mocks.default )( f: (List[TokenedEvents], MockEnvironment) => IO[A] ): IO[A] = toInputs.flatMap { inputs => - MockEnvironment.build(inputs, channelResponses).use { control => + MockEnvironment.build(inputs, mocks).use { control => f(inputs, control) } } - def generateEvents: Stream[IO, TokenedEvents] = - Stream.eval { - for { - ack <- IO.unique - eventId1 <- IO.randomUUID - eventId2 <- IO.randomUUID - collectorTstamp <- IO.realTimeInstant - } yield { - val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") - val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") - val serialized = Chunk(event1, event2).map { e => - ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8)) - } - TokenedEvents(serialized, ack, None) - } - }.repeat + def inputEvents(count: Long, source: IO[TokenedEvents]): IO[List[TokenedEvents]] = + Stream + .eval(source) + .repeat + .take(count) + .compile + .toList - def generateBadlyFormatted: Stream[IO, TokenedEvents] = - Stream.eval { - IO.unique.map { token => - val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) - TokenedEvents(serialized, token, None) + def good: IO[TokenedEvents] = + for { + ack <- IO.unique + eventId1 <- IO.randomUUID + eventId2 <- IO.randomUUID + collectorTstamp <- IO.realTimeInstant + } yield { + val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") + val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") + val serialized = Chunk(event1, event2).map { e => + ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8)) } - }.repeat + TokenedEvents(serialized, ack, None) + } + + def badlyFormatted: IO[TokenedEvents] = + IO.unique.map { token => + val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + TokenedEvents(serialized, token, None) + } }