diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index d4b15cf..a663b20 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -78,6 +78,12 @@ # - How many batches can we send simultaneously over the network to Snowflake. "uploadConcurrency": 1 } + + # Retry configuration for Snowflake operation failures + "retries": { + # Starting backoff period + "backoff": "30 seconds" + } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] "skipSchemas": [ diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index a9d5c19..d71975f 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -99,6 +99,12 @@ # - How many batches can we send simultaneously over the network to Snowflake. "uploadConcurrency": 1 } + + # Retry configuration for Snowflake operation failures + "retries": { + # Starting backoff period + "backoff": "30 seconds" + } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] "skipSchemas": [ diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 3f879f7..af26952 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -79,19 +79,12 @@ # - How many batches can we send simultaneously over the network to Snowflake. "uploadConcurrency": 1 } - - "batching": { - - # - Events are emitted to Snowflake when the batch reaches this size in bytes - "maxBytes": 16000000 - - # - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached - "maxDelay": "1 second" - - # - How many batches can we send simultaneously over the network to Snowflake. - # - The Snowflake SDK dictates this should be kept below the number of available runtime processors (cpu) - "uploadConcurrency": 1 - } + + # Retry configuration for Snowflake operation failures + "retries": { + # Starting backoff period + "backoff": "30 seconds" + } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] "skipSchemas": [ diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 2a8f11b..eaabbe9 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -16,6 +16,10 @@ "uploadConcurrency": 3 } + "retries": { + "backoff": "30 seconds" + } + "skipSchemas": [] "monitoring": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala index 34cead9..6447757 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala @@ -31,7 +31,7 @@ object AppHealth { case (Healthy, Healthy) => Healthy case (Healthy, unhealthy) => unhealthy case (unhealthy, Healthy) => unhealthy - case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") // TODO do we want to combine reasons like that...? + case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") } private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index ce23c0b..fe5203c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -26,6 +26,7 @@ case class Config[+Source, +Sink]( input: Source, output: Config.Output[Sink], batching: Config.Batching, + retries: Config.Retries, skipSchemas: List[SchemaCriterion], telemetry: Telemetry.Config, monitoring: Config.Monitoring @@ -75,6 +76,8 @@ object Config { healthProbe: HealthProbe ) + case class Retries(backoff: FiniteDuration) + implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { implicit val configuration = Configuration.default.withDiscriminator("type") implicit val urlDecoder = Decoder.decodeString.emapTry { str => @@ -93,6 +96,7 @@ object Config { implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] + implicit val retriesDecoder = deriveConfiguredDecoder[Retries] deriveConfiguredDecoder[Config[Source, Sink]] } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index 502225d..15cdd92 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -46,9 +46,9 @@ object Environment { badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) - _ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(SQLUtils.createTable(config.output.good, xa))) - tblManager = TableManager.fromTransactor(config.output.good, snowflakeHealth, xa) - channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching) + _ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa))) + tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries) + channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries) sourceAndAck <- Resource.eval(toSource(config.input)) _ <- HealthProbe.resource( config.monitoring.healthProbe.port, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala index 7313953..1e0ec1e 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala @@ -110,7 +110,8 @@ object ChannelProvider { def make[F[_]: Async]( config: Config.Snowflake, snowflakeHealth: SnowflakeHealth[F], - batchingConfig: Config.Batching + batchingConfig: Config.Batching, + retriesConfig: Config.Retries ): Resource[F, ChannelProvider[F]] = for { client <- createClient(config, batchingConfig) @@ -118,14 +119,15 @@ object ChannelProvider { channel <- Resource.eval(hs.swap(createChannel(config, client))) ref <- Resource.eval(Ref[F].of(channel)) sem <- Resource.eval(Semaphore[F](allAvailablePermits)) - } yield impl(ref, hs, sem, createChannel(config, client), snowflakeHealth) + } yield impl(ref, hs, sem, createChannel(config, client), snowflakeHealth, retriesConfig) private def impl[F[_]: Async]( ref: Ref[F, SnowflakeStreamingIngestChannel], hs: Hotswap[F, SnowflakeStreamingIngestChannel], sem: Semaphore[F], next: Resource[F, SnowflakeStreamingIngestChannel], - snowflakeHealth: SnowflakeHealth[F] + snowflakeHealth: SnowflakeHealth[F], + retriesConfig: Config.Retries ): ChannelProvider[F] = new ChannelProvider[F] { def reset: F[Unit] = @@ -164,7 +166,9 @@ object ChannelProvider { for { channel <- ref.get response <- - SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null))) + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig)( + Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null)) + ) _ <- flushChannel[F](channel) isValid <- Sync[F].delay(channel.isValid) } yield if (isValid) WriteResult.WriteFailures(parseResponse(response)) else WriteResult.ChannelIsInvalid diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala index 335d629..8bdfe1c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeHealth.scala @@ -5,7 +5,7 @@ import cats.implicits._ import com.snowplowanalytics.snowplow.runtime.HealthProbe final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) { - def setUnhealthy(): F[Unit] = state.set(HealthProbe.Unhealthy("Snowflake is not healthy")) // TODO do we need more details here? + def setUnhealthy(): F[Unit] = state.set(HealthProbe.Unhealthy("Snowflake is not healthy")) def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala index a7564f3..fa8feee 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala @@ -2,38 +2,30 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.Sync import cats.implicits._ -import net.snowflake.client.jdbc.SnowflakeSQLException +import com.snowplowanalytics.snowplow.snowflake.Config import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry._ import retry.implicits.retrySyntaxError -import scala.concurrent.duration.DurationInt - object SnowflakeRetrying { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F])(action: F[A]): F[A] = - retryUntilSuccessful(snowflakeHealth, action) <* + def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], config: Config.Retries)(action: F[A]): F[A] = + retryUntilSuccessful(snowflakeHealth, config, action) <* snowflakeHealth.setHealthy() - private def retryUntilSuccessful[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], action: F[A]) = - action.retryingOnSomeErrors( - isWorthRetrying = isWorthRetrying(_).pure[F], - policy = RetryPolicies.exponentialBackoff[F](baseDelay = 1.minute), // TODO make it configurable - onError = (ex, _) => handleError(snowflakeHealth, ex) - ) - - private def handleError[F[_]: Sync](snowflakeHealth: SnowflakeHealth[F], ex: Throwable): F[Unit] = - snowflakeHealth.setUnhealthy() *> - Logger[F].error(ex)("Executing Snowflake command failed") - - private def isWorthRetrying(error: Throwable): Boolean = error match { - // TODO Retry for some specific error codes or simply for any snowflake exception? - case se: SnowflakeSQLException if se.getErrorCode === 2222 => - true - case _ => true - } - + private def retryUntilSuccessful[F[_]: Sync: Sleep, A]( + snowflakeHealth: SnowflakeHealth[F], + config: Config.Retries, + action: F[A] + ) = + action + .onError(_ => snowflakeHealth.setUnhealthy()) + .retryingOnAllErrors( + policy = RetryPolicies.exponentialBackoff[F](config.backoff), + onError = (error, retryDetails) => + Logger[F].error(error)(s"Executing Snowflake command failed. Retries so far: ${retryDetails.retriesSoFar}") + ) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index 0d08e7d..ca103c1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -31,11 +31,12 @@ object TableManager { def fromTransactor[F[_]: Async]( config: Config.Snowflake, + xa: Transactor[F], snowflakeHealth: SnowflakeHealth[F], - xa: Transactor[F] + retriesConfig: Config.Retries ): TableManager[F] = new TableManager[F] { - def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth) { + def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> xa.rawTrans.apply { columns.traverse_ { col =>