diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index d4b15cf..a663b20 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -78,6 +78,12 @@ # - How many batches can we send simultaneously over the network to Snowflake. "uploadConcurrency": 1 } + + # Retry configuration for Snowflake operation failures + "retries": { + # Starting backoff period + "backoff": "30 seconds" + } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] "skipSchemas": [ diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index a9d5c19..d71975f 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -99,6 +99,12 @@ # - How many batches can we send simultaneously over the network to Snowflake. "uploadConcurrency": 1 } + + # Retry configuration for Snowflake operation failures + "retries": { + # Starting backoff period + "backoff": "30 seconds" + } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] "skipSchemas": [ diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 3f879f7..af26952 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -79,19 +79,12 @@ # - How many batches can we send simultaneously over the network to Snowflake. "uploadConcurrency": 1 } - - "batching": { - - # - Events are emitted to Snowflake when the batch reaches this size in bytes - "maxBytes": 16000000 - - # - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached - "maxDelay": "1 second" - - # - How many batches can we send simultaneously over the network to Snowflake. - # - The Snowflake SDK dictates this should be kept below the number of available runtime processors (cpu) - "uploadConcurrency": 1 - } + + # Retry configuration for Snowflake operation failures + "retries": { + # Starting backoff period + "backoff": "30 seconds" + } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] "skipSchemas": [ diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 2a8f11b..eaabbe9 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -16,6 +16,10 @@ "uploadConcurrency": 3 } + "retries": { + "backoff": "30 seconds" + } + "skipSchemas": [] "monitoring": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala new file mode 100644 index 0000000..236d0bc --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala @@ -0,0 +1,38 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.implicits._ +import cats.{Functor, Monad, Monoid} +import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy} +import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth +import com.snowplowanalytics.snowplow.sources.SourceAndAck + +object AppHealth { + + def isHealthy[F[_]: Monad]( + config: Config.HealthProbe, + source: SourceAndAck[F], + snowflakeHealth: SnowflakeHealth[F] + ): F[HealthProbe.Status] = + List( + latencyHealth(config, source), + snowflakeHealth.state.get + ).foldA + + private def latencyHealth[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = + source.processingLatency.map { latency => + if (latency > config.unhealthyLatency) + Unhealthy(show"Processing latency is $latency") + else + Healthy + } + + private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = { + case (Healthy, Healthy) => Healthy + case (Healthy, unhealthy) => unhealthy + case (unhealthy, Healthy) => unhealthy + case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") + } + + private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth) +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index ce23c0b..fe5203c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -26,6 +26,7 @@ case class Config[+Source, +Sink]( input: Source, output: Config.Output[Sink], batching: Config.Batching, + retries: Config.Retries, skipSchemas: List[SchemaCriterion], telemetry: Telemetry.Config, monitoring: Config.Monitoring @@ -75,6 +76,8 @@ object Config { healthProbe: HealthProbe ) + case class Retries(backoff: FiniteDuration) + implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { implicit val configuration = Configuration.default.withDiscriminator("type") implicit val urlDecoder = Decoder.decodeString.emapTry { str => @@ -93,6 +96,7 @@ object Config { implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] + implicit val retriesDecoder = deriveConfiguredDecoder[Retries] deriveConfiguredDecoder[Config[Source, Sink]] } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index 6993bcc..4cca944 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -7,18 +7,17 @@ */ package com.snowplowanalytics.snowplow.snowflake -import cats.implicits._ -import cats.Functor -import cats.effect.{Async, Resource, Sync} import cats.effect.unsafe.implicits.global +import cats.effect.{Async, Resource, Sync} +import cats.implicits._ import com.snowplowanalytics.iglu.core.SchemaCriterion -import org.http4s.client.Client -import org.http4s.blaze.client.BlazeClientBuilder -import io.sentry.Sentry -import com.snowplowanalytics.snowplow.sources.SourceAndAck -import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} +import com.snowplowanalytics.snowplow.sinks.Sink +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager} +import com.snowplowanalytics.snowplow.sources.SourceAndAck +import io.sentry.Sentry +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.client.Client case class Environment[F[_]]( appInfo: AppInfo, @@ -41,16 +40,21 @@ object Environment { toSink: SinkConfig => Resource[F, Sink[F]] ): Resource[F, Environment[F]] = for { + snowflakeHealth <- Resource.eval(SnowflakeHealth.initUnhealthy[F]) + sourceAndAck <- Resource.eval(toSource(config.input)) + _ <- HealthProbe.resource( + config.monitoring.healthProbe.port, + AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth) + ) _ <- enableSentry[F](appInfo, config.monitoring.sentry) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) - _ <- Resource.eval(SQLUtils.createTable(config.output.good, xa)) - tblManager = TableManager.fromTransactor(config.output.good, xa) - channelProvider <- ChannelProvider.make(config.output.good, config.batching) - sourceAndAck <- Resource.eval(toSource(config.input)) - _ <- HealthProbe.resource(config.monitoring.healthProbe.port, isHealthy(config.monitoring.healthProbe, sourceAndAck)) + _ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa))) + tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries) + channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries) + } yield Environment( appInfo = appInfo, source = sourceAndAck, @@ -83,13 +87,4 @@ object Environment { case None => Resource.unit[F] } - - private def isHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = - source.processingLatency.map { latency => - if (latency > config.unhealthyLatency) - HealthProbe.Unhealthy(show"Processing latency is $latency") - else - HealthProbe.Healthy - } - } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala index 46fff90..07da9b4 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala @@ -107,14 +107,19 @@ object ChannelProvider { /** A large number so we don't limit the number of permits for calls to `flush` and `enqueue` */ private val allAvailablePermits: Long = Long.MaxValue - def make[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, ChannelProvider[F]] = + def make[F[_]: Async]( + config: Config.Snowflake, + snowflakeHealth: SnowflakeHealth[F], + batchingConfig: Config.Batching, + retriesConfig: Config.Retries + ): Resource[F, ChannelProvider[F]] = for { client <- createClient(config, batchingConfig) - hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel] - channel <- Resource.eval(hs.swap(createChannel(config, client))) + channelResource = createChannel(config, client, snowflakeHealth, retriesConfig) + (hs, channel) <- Hotswap.apply(channelResource) ref <- Resource.eval(Ref[F].of(channel)) sem <- Resource.eval(Semaphore[F](allAvailablePermits)) - } yield impl(ref, hs, sem, createChannel(config, client)) + } yield impl(ref, hs, sem, channelResource) private def impl[F[_]: Async]( ref: Ref[F, SnowflakeStreamingIngestChannel], @@ -189,7 +194,9 @@ object ChannelProvider { private def createChannel[F[_]: Async]( config: Config.Snowflake, - client: SnowflakeStreamingIngestClient + client: SnowflakeStreamingIngestClient, + snowflakeHealth: SnowflakeHealth[F], + retriesConfig: Config.Retries ): Resource[F, SnowflakeStreamingIngestChannel] = { val request = OpenChannelRequest .builder(config.channel) @@ -201,7 +208,9 @@ object ChannelProvider { .build val make = Logger[F].info(s"Opening channel ${config.channel}") *> - Async[F].blocking(client.openChannel(request)) + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + Async[F].blocking(client.openChannel(request)) + } Resource.make(make) { channel => Logger[F].info(s"Closing channel ${config.channel}") *> diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala new file mode 100644 index 0000000..0a54628 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala @@ -0,0 +1,20 @@ +package com.snowplowanalytics.snowplow.snowflake.processing + +import cats.effect.{Concurrent, Ref} +import cats.implicits._ +import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth.unhealthy + +final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) { + def setUnhealthy(): F[Unit] = state.set(unhealthy) + def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy) +} + +object SnowflakeHealth { + private val unhealthy = HealthProbe.Unhealthy("Snowflake connection is not healthy") + + def initUnhealthy[F[_]: Concurrent]: F[SnowflakeHealth[F]] = + Ref + .of[F, HealthProbe.Status](unhealthy) + .map(SnowflakeHealth.apply) +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala new file mode 100644 index 0000000..5937bb5 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala @@ -0,0 +1,37 @@ +package com.snowplowanalytics.snowplow.snowflake.processing + +import cats.effect.Sync +import cats.implicits._ +import com.snowplowanalytics.snowplow.snowflake.Config +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry._ +import retry.implicits.retrySyntaxError + +object SnowflakeRetrying { + + private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + + def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], config: Config.Retries)(action: F[A]): F[A] = + retryUntilSuccessful(snowflakeHealth, config, action) <* + snowflakeHealth.setHealthy() + + private def retryUntilSuccessful[F[_]: Sync: Sleep, A]( + snowflakeHealth: SnowflakeHealth[F], + config: Config.Retries, + action: F[A] + ): F[A] = + action + .onError(_ => snowflakeHealth.setUnhealthy()) + .retryingOnAllErrors( + policy = RetryPolicies.exponentialBackoff[F](config.backoff), + onError = (error, details) => Logger[F].error(error)(s"Executing Snowflake command failed. ${extractRetryDetails(details)}") + ) + + private def extractRetryDetails(details: RetryDetails): String = details match { + case RetryDetails.GivingUp(totalRetries, totalDelay) => + s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds" + case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) => + s"Will retry in ${nextDelay.toSeconds} seconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toSeconds} seconds" + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index 4372703..ca103c1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -31,10 +31,12 @@ object TableManager { def fromTransactor[F[_]: Async]( config: Config.Snowflake, - xa: Transactor[F] + xa: Transactor[F], + snowflakeHealth: SnowflakeHealth[F], + retriesConfig: Config.Retries ): TableManager[F] = new TableManager[F] { - def addColumns(columns: List[String]): F[Unit] = + def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> xa.rawTrans.apply { columns.traverse_ { col => @@ -45,6 +47,7 @@ object TableManager { } } } + } } private val reUnstruct: Regex = "^unstruct_event_.*$".r