From b8d60aaa650ee13e56817f1c2b8fb1aad90273b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Mon, 4 Dec 2023 11:22:55 +0100 Subject: [PATCH] Don't crash app for Snowflake failures --- .../AppHealth.scala | 38 ++++++++++++++++ .../Environment.scala | 37 +++++++-------- .../processing/Processing.scala | 4 +- .../processing/SnowflakeActionRunner.scala | 45 +++++++++++++++++++ .../processing/SnowflakeHealth.scala | 17 +++++++ .../MockEnvironment.scala | 7 ++- 6 files changed, 125 insertions(+), 23 deletions(-) create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala new file mode 100644 index 0000000..34cead9 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala @@ -0,0 +1,38 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.implicits._ +import cats.{Functor, Monad, Monoid} +import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy} +import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth +import com.snowplowanalytics.snowplow.sources.SourceAndAck + +object AppHealth { + + def isHealthy[F[_]: Monad]( + config: Config.HealthProbe, + source: SourceAndAck[F], + snowflakeHealth: SnowflakeHealth[F] + ): F[HealthProbe.Status] = + List( + latencyHealth(config, source), + snowflakeHealth.state.get + ).sequence.map(_.combineAll) + + private def latencyHealth[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = + source.processingLatency.map { latency => + if (latency > config.unhealthyLatency) + Unhealthy(show"Processing latency is $latency") + else + Healthy + } + + private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = { + case (Healthy, Healthy) => Healthy + case (Healthy, unhealthy) => unhealthy + case (unhealthy, Healthy) => unhealthy + case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") // TODO do we want to combine reasons like that...? + } + + private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth) +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index 6993bcc..f936398 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -7,18 +7,17 @@ */ package com.snowplowanalytics.snowplow.snowflake -import cats.implicits._ -import cats.Functor -import cats.effect.{Async, Resource, Sync} import cats.effect.unsafe.implicits.global +import cats.effect.{Async, Resource, Sync} +import cats.implicits._ import com.snowplowanalytics.iglu.core.SchemaCriterion -import org.http4s.client.Client -import org.http4s.blaze.client.BlazeClientBuilder -import io.sentry.Sentry -import com.snowplowanalytics.snowplow.sources.SourceAndAck -import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} +import com.snowplowanalytics.snowplow.sinks.Sink +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, SnowflakeHealth, TableManager} +import com.snowplowanalytics.snowplow.sources.SourceAndAck +import io.sentry.Sentry +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.client.Client case class Environment[F[_]]( appInfo: AppInfo, @@ -28,6 +27,7 @@ case class Environment[F[_]]( tblManager: TableManager[F], channelProvider: ChannelProvider[F], metrics: Metrics[F], + snowflakeRunner: SnowflakeActionRunner[F], batching: Config.Batching, schemasToSkip: List[SchemaCriterion] ) @@ -41,16 +41,21 @@ object Environment { toSink: SinkConfig => Resource[F, Sink[F]] ): Resource[F, Environment[F]] = for { + snowflakeHealth <- Resource.eval(SnowflakeHealth.initHealthy[F]) + snowflakeRunner = SnowflakeActionRunner.retryingIndefinitely[F](snowflakeHealth) _ <- enableSentry[F](appInfo, config.monitoring.sentry) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) - _ <- Resource.eval(SQLUtils.createTable(config.output.good, xa)) + _ <- Resource.eval(snowflakeRunner.run(SQLUtils.createTable(config.output.good, xa))) tblManager = TableManager.fromTransactor(config.output.good, xa) channelProvider <- ChannelProvider.make(config.output.good, config.batching) sourceAndAck <- Resource.eval(toSource(config.input)) - _ <- HealthProbe.resource(config.monitoring.healthProbe.port, isHealthy(config.monitoring.healthProbe, sourceAndAck)) + _ <- HealthProbe.resource( + config.monitoring.healthProbe.port, + AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth) + ) } yield Environment( appInfo = appInfo, source = sourceAndAck, @@ -59,6 +64,7 @@ object Environment { tblManager = tblManager, channelProvider = channelProvider, metrics = metrics, + snowflakeRunner = snowflakeRunner, batching = config.batching, schemasToSkip = config.skipSchemas ) @@ -83,13 +89,4 @@ object Environment { case None => Resource.unit[F] } - - private def isHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = - source.processingLatency.map { latency => - if (latency > config.unhealthyLatency) - HealthProbe.Unhealthy(show"Processing latency is $latency") - else - HealthProbe.Healthy - } - } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index ca795e3..a44b828 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -223,7 +223,7 @@ object Processing { batch.pure[F] else Sync[F].untilDefinedM { - env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2)).flatMap { + env.snowflakeRunner.run(env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2))).flatMap { case ChannelProvider.WriteResult.ChannelIsInvalid => // Reset the channel and immediately try again env.channelProvider.reset.as(none) @@ -339,7 +339,7 @@ object Processing { ().pure[F] else env.channelProvider.withClosedChannel { - env.tblManager.addColumns(extraColsRequired.toList) + env.snowflakeRunner.run(env.tblManager.addColumns(extraColsRequired.toList)) } private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala new file mode 100644 index 0000000..4ccc716 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala @@ -0,0 +1,45 @@ +package com.snowplowanalytics.snowplow.snowflake.processing + +import cats.effect.Sync +import cats.implicits._ +import net.snowflake.client.jdbc.SnowflakeSQLException +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry._ +import retry.implicits.retrySyntaxError + +import scala.concurrent.duration.DurationInt + +trait SnowflakeActionRunner[F[_]] { + def run[A](action: F[A]): F[A] +} + +object SnowflakeActionRunner { + + private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + + def retryingIndefinitely[F[_]: Sync: Sleep](snowflakeHealth: SnowflakeHealth[F]): SnowflakeActionRunner[F] = + new SnowflakeActionRunner[F] { + override def run[A](action: F[A]): F[A] = + retryUntilSuccessful(snowflakeHealth, action) <* + snowflakeHealth.setHealthy() + } + + private def retryUntilSuccessful[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], action: F[A]) = + action.retryingOnSomeErrors( + isWorthRetrying = isWorthRetrying(_).pure[F], + policy = RetryPolicies.exponentialBackoff[F](baseDelay = 1.minute), // TODO make it configurable + onError = (ex, _) => handleError(snowflakeHealth, ex) + ) + + private def handleError[F[_]: Sync](snowflakeHealth: SnowflakeHealth[F], ex: Throwable): F[Unit] = + snowflakeHealth.setUnhealthy() *> + Logger[F].error(ex)("Executing Snowflake command failed") + + private def isWorthRetrying(error: Throwable): Boolean = error match { + // TODO Retry for some specific error codes or simply for any snowflake exception? + case se: SnowflakeSQLException if se.getErrorCode === 2222 => + true + case _ => true + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala new file mode 100644 index 0000000..335d629 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala @@ -0,0 +1,17 @@ +package com.snowplowanalytics.snowplow.snowflake.processing + +import cats.effect.{Concurrent, Ref} +import cats.implicits._ +import com.snowplowanalytics.snowplow.runtime.HealthProbe + +final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) { + def setUnhealthy(): F[Unit] = state.set(HealthProbe.Unhealthy("Snowflake is not healthy")) // TODO do we need more details here? + def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy) +} + +object SnowflakeHealth { + def initHealthy[F[_]: Concurrent]: F[SnowflakeHealth[F]] = + Ref + .of[F, HealthProbe.Status](HealthProbe.Healthy) + .map(SnowflakeHealth.apply) +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 989edbf..c7131b0 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -14,7 +14,7 @@ import fs2.Stream import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, TableManager} import com.snowplowanalytics.snowplow.runtime.AppInfo import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} @@ -60,6 +60,7 @@ object MockEnvironment { tblManager = testTableManager(state), channelProvider = channelProvider, metrics = testMetrics(state), + snowflakeRunner = testSnowflakeRunner(), batching = Config.Batching( maxBytes = 16000000, maxDelay = 10.seconds, @@ -158,4 +159,8 @@ object MockEnvironment { def report: Stream[IO, Nothing] = Stream.never[IO] } + + private def testSnowflakeRunner() = new SnowflakeActionRunner[IO] { + override def run[A](action: IO[A]): IO[A] = action + } }