Skip to content

Commit

Permalink
Don't crash app for Snowflake failures (close #11)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 7, 2023
1 parent 01a4f97 commit ceb01e5
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 44 deletions.
6 changes: 6 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand Down
6 changes: 6 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand Down
19 changes: 6 additions & 13 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,12 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

"batching": {

# - Events are emitted to Snowflake when the batch reaches this size in bytes
"maxBytes": 16000000

# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
# - The Snowflake SDK dictates this should be kept below the number of available runtime processors (cpu)
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"uploadConcurrency": 3
}

"retries": {
"backoff": "30 seconds"
}

"skipSchemas": []

"monitoring": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.{Functor, Monad, Monoid}
import com.snowplowanalytics.snowplow.runtime.HealthProbe
import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy}
import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth
import com.snowplowanalytics.snowplow.sources.SourceAndAck

object AppHealth {

def isHealthy[F[_]: Monad](
config: Config.HealthProbe,
source: SourceAndAck[F],
snowflakeHealth: SnowflakeHealth[F]
): F[HealthProbe.Status] =
List(
latencyHealth(config, source),
snowflakeHealth.state.get
).foldA

private def latencyHealth[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.processingLatency.map { latency =>
if (latency > config.unhealthyLatency)
Unhealthy(show"Processing latency is $latency")
else
Healthy
}

private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = {
case (Healthy, Healthy) => Healthy
case (Healthy, unhealthy) => unhealthy
case (unhealthy, Healthy) => unhealthy
case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second")
}

private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class Config[+Source, +Sink](
input: Source,
output: Config.Output[Sink],
batching: Config.Batching,
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
monitoring: Config.Monitoring
Expand Down Expand Up @@ -75,6 +76,8 @@ object Config {
healthProbe: HealthProbe
)

case class Retries(backoff: FiniteDuration)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val urlDecoder = Decoder.decodeString.emapTry { str =>
Expand All @@ -93,6 +96,7 @@ object Config {
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
deriveConfiguredDecoder[Config[Source, Sink]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
*/
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.Functor
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import com.snowplowanalytics.iglu.core.SchemaCriterion
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import io.sentry.Sentry
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client

case class Environment[F[_]](
appInfo: AppInfo,
Expand All @@ -41,16 +40,21 @@ object Environment {
toSink: SinkConfig => Resource[F, Sink[F]]
): Resource[F, Environment[F]] =
for {
snowflakeHealth <- Resource.eval(SnowflakeHealth.initUnhealthy[F])
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth)
)
_ <- enableSentry[F](appInfo, config.monitoring.sentry)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(SQLUtils.createTable(config.output.good, xa))
tblManager = TableManager.fromTransactor(config.output.good, xa)
channelProvider <- ChannelProvider.make(config.output.good, config.batching)
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, isHealthy(config.monitoring.healthProbe, sourceAndAck))
_ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries)
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries)

} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
Expand Down Expand Up @@ -83,13 +87,4 @@ object Environment {
case None =>
Resource.unit[F]
}

private def isHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.processingLatency.map { latency =>
if (latency > config.unhealthyLatency)
HealthProbe.Unhealthy(show"Processing latency is $latency")
else
HealthProbe.Healthy
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,19 @@ object ChannelProvider {
/** A large number so we don't limit the number of permits for calls to `flush` and `enqueue` */
private val allAvailablePermits: Long = Long.MaxValue

def make[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, ChannelProvider[F]] =
def make[F[_]: Async](
config: Config.Snowflake,
snowflakeHealth: SnowflakeHealth[F],
batchingConfig: Config.Batching,
retriesConfig: Config.Retries
): Resource[F, ChannelProvider[F]] =
for {
client <- createClient(config, batchingConfig)
hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel]
channel <- Resource.eval(hs.swap(createChannel(config, client)))
channelResource = createChannel(config, client, snowflakeHealth, retriesConfig)
(hs, channel) <- Hotswap.apply(channelResource)
ref <- Resource.eval(Ref[F].of(channel))
sem <- Resource.eval(Semaphore[F](allAvailablePermits))
} yield impl(ref, hs, sem, createChannel(config, client))
} yield impl(ref, hs, sem, channelResource)

private def impl[F[_]: Async](
ref: Ref[F, SnowflakeStreamingIngestChannel],
Expand Down Expand Up @@ -189,7 +194,9 @@ object ChannelProvider {

private def createChannel[F[_]: Async](
config: Config.Snowflake,
client: SnowflakeStreamingIngestClient
client: SnowflakeStreamingIngestClient,
snowflakeHealth: SnowflakeHealth[F],
retriesConfig: Config.Retries
): Resource[F, SnowflakeStreamingIngestChannel] = {
val request = OpenChannelRequest
.builder(config.channel)
Expand All @@ -201,7 +208,9 @@ object ChannelProvider {
.build

val make = Logger[F].info(s"Opening channel ${config.channel}") *>
Async[F].blocking(client.openChannel(request))
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
Async[F].blocking(client.openChannel(request))
}

Resource.make(make) { channel =>
Logger[F].info(s"Closing channel ${config.channel}") *>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.{Concurrent, Ref}
import cats.implicits._
import com.snowplowanalytics.snowplow.runtime.HealthProbe
import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth.unhealthy

final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) {
def setUnhealthy(): F[Unit] = state.set(unhealthy)
def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy)
}

object SnowflakeHealth {
private val unhealthy = HealthProbe.Unhealthy("Snowflake connection is not healthy")

def initUnhealthy[F[_]: Concurrent]: F[SnowflakeHealth[F]] =
Ref
.of[F, HealthProbe.Status](unhealthy)
.map(SnowflakeHealth.apply)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.snowplow.snowflake.Config
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry._
import retry.implicits.retrySyntaxError

object SnowflakeRetrying {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], config: Config.Retries)(action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, config, action) <*
snowflakeHealth.setHealthy()

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](
snowflakeHealth: SnowflakeHealth[F],
config: Config.Retries,
action: F[A]
): F[A] =
action
.onError(_ => snowflakeHealth.setUnhealthy())
.retryingOnAllErrors(
policy = RetryPolicies.exponentialBackoff[F](config.backoff),
onError = (error, details) => Logger[F].error(error)(s"Executing Snowflake command failed. ${extractRetryDetails(details)}")
)

private def extractRetryDetails(details: RetryDetails): String = details match {
case RetryDetails.GivingUp(totalRetries, totalDelay) =>
s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds"
case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) =>
s"Will retry in ${nextDelay.toSeconds} seconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toSeconds} seconds"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ object TableManager {

def fromTransactor[F[_]: Async](
config: Config.Snowflake,
xa: Transactor[F]
xa: Transactor[F],
snowflakeHealth: SnowflakeHealth[F],
retriesConfig: Config.Retries
): TableManager[F] = new TableManager[F] {

def addColumns(columns: List[String]): F[Unit] =
def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *>
xa.rawTrans.apply {
columns.traverse_ { col =>
Expand All @@ -45,6 +47,7 @@ object TableManager {
}
}
}
}
}

private val reUnstruct: Regex = "^unstruct_event_.*$".r
Expand Down

0 comments on commit ceb01e5

Please sign in to comment.