Skip to content

Commit

Permalink
Add retry config
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 4, 2023
1 parent 6ebcb9f commit bbaa76a
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 47 deletions.
6 changes: 6 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand Down
6 changes: 6 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand Down
19 changes: 6 additions & 13 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,12 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

"batching": {

# - Events are emitted to Snowflake when the batch reaches this size in bytes
"maxBytes": 16000000

# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
# - The Snowflake SDK dictates this should be kept below the number of available runtime processors (cpu)
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"uploadConcurrency": 3
}

"retries": {
"backoff": "30 seconds"
}

"skipSchemas": []

"monitoring": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object AppHealth {
case (Healthy, Healthy) => Healthy
case (Healthy, unhealthy) => unhealthy
case (unhealthy, Healthy) => unhealthy
case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") // TODO do we want to combine reasons like that...?
case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second")
}

private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class Config[+Source, +Sink](
input: Source,
output: Config.Output[Sink],
batching: Config.Batching,
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
monitoring: Config.Monitoring
Expand Down Expand Up @@ -75,6 +76,8 @@ object Config {
healthProbe: HealthProbe
)

case class Retries(backoff: FiniteDuration)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val urlDecoder = Decoder.decodeString.emapTry { str =>
Expand All @@ -93,6 +96,7 @@ object Config {
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
deriveConfiguredDecoder[Config[Source, Sink]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ object Environment {
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, snowflakeHealth, xa)
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching)
_ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries)
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries)
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,24 @@ object ChannelProvider {
def make[F[_]: Async](
config: Config.Snowflake,
snowflakeHealth: SnowflakeHealth[F],
batchingConfig: Config.Batching
batchingConfig: Config.Batching,
retriesConfig: Config.Retries
): Resource[F, ChannelProvider[F]] =
for {
client <- createClient(config, batchingConfig)
hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel]
channel <- Resource.eval(hs.swap(createChannel(config, client)))
ref <- Resource.eval(Ref[F].of(channel))
sem <- Resource.eval(Semaphore[F](allAvailablePermits))
} yield impl(ref, hs, sem, createChannel(config, client), snowflakeHealth)
} yield impl(ref, hs, sem, createChannel(config, client), snowflakeHealth, retriesConfig)

private def impl[F[_]: Async](
ref: Ref[F, SnowflakeStreamingIngestChannel],
hs: Hotswap[F, SnowflakeStreamingIngestChannel],
sem: Semaphore[F],
next: Resource[F, SnowflakeStreamingIngestChannel],
snowflakeHealth: SnowflakeHealth[F]
snowflakeHealth: SnowflakeHealth[F],
retriesConfig: Config.Retries
): ChannelProvider[F] =
new ChannelProvider[F] {
def reset: F[Unit] =
Expand Down Expand Up @@ -164,7 +166,9 @@ object ChannelProvider {
for {
channel <- ref.get
response <-
SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null)))
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig)(
Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null))
)
_ <- flushChannel[F](channel)
isValid <- Sync[F].delay(channel.isValid)
} yield if (isValid) WriteResult.WriteFailures(parseResponse(response)) else WriteResult.ChannelIsInvalid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.implicits._
import com.snowplowanalytics.snowplow.runtime.HealthProbe

final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) {
def setUnhealthy(): F[Unit] = state.set(HealthProbe.Unhealthy("Snowflake is not healthy")) // TODO do we need more details here?
def setUnhealthy(): F[Unit] = state.set(HealthProbe.Unhealthy("Snowflake is not healthy"))
def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,30 @@ package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.Sync
import cats.implicits._
import net.snowflake.client.jdbc.SnowflakeSQLException
import com.snowplowanalytics.snowplow.snowflake.Config
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry._
import retry.implicits.retrySyntaxError

import scala.concurrent.duration.DurationInt

object SnowflakeRetrying {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F])(action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, action) <*
def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], config: Config.Retries)(action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, config, action) <*
snowflakeHealth.setHealthy()

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], action: F[A]) =
action.retryingOnSomeErrors(
isWorthRetrying = isWorthRetrying(_).pure[F],
policy = RetryPolicies.exponentialBackoff[F](baseDelay = 1.minute), // TODO make it configurable
onError = (ex, _) => handleError(snowflakeHealth, ex)
)

private def handleError[F[_]: Sync](snowflakeHealth: SnowflakeHealth[F], ex: Throwable): F[Unit] =
snowflakeHealth.setUnhealthy() *>
Logger[F].error(ex)("Executing Snowflake command failed")

private def isWorthRetrying(error: Throwable): Boolean = error match {
// TODO Retry for some specific error codes or simply for any snowflake exception?
case se: SnowflakeSQLException if se.getErrorCode === 2222 =>
true
case _ => true
}

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](
snowflakeHealth: SnowflakeHealth[F],
config: Config.Retries,
action: F[A]
) =
action
.onError(_ => snowflakeHealth.setUnhealthy())
.retryingOnAllErrors(
policy = RetryPolicies.exponentialBackoff[F](config.backoff),
onError = (error, retryDetails) =>
Logger[F].error(error)(s"Executing Snowflake command failed. Retries so far: ${retryDetails.retriesSoFar}")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ object TableManager {

def fromTransactor[F[_]: Async](
config: Config.Snowflake,
xa: Transactor[F],
snowflakeHealth: SnowflakeHealth[F],
xa: Transactor[F]
retriesConfig: Config.Retries
): TableManager[F] = new TableManager[F] {

def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth) {
def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *>
xa.rawTrans.apply {
columns.traverse_ { col =>
Expand Down

0 comments on commit bbaa76a

Please sign in to comment.