Skip to content

Commit

Permalink
Don't crash app for Snowflake failures
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 4, 2023
1 parent 30b2d18 commit b8d60aa
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.{Functor, Monad, Monoid}
import com.snowplowanalytics.snowplow.runtime.HealthProbe
import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy}
import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth
import com.snowplowanalytics.snowplow.sources.SourceAndAck

object AppHealth {

def isHealthy[F[_]: Monad](
config: Config.HealthProbe,
source: SourceAndAck[F],
snowflakeHealth: SnowflakeHealth[F]
): F[HealthProbe.Status] =
List(
latencyHealth(config, source),
snowflakeHealth.state.get
).sequence.map(_.combineAll)

private def latencyHealth[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.processingLatency.map { latency =>
if (latency > config.unhealthyLatency)
Unhealthy(show"Processing latency is $latency")
else
Healthy
}

private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = {
case (Healthy, Healthy) => Healthy
case (Healthy, unhealthy) => unhealthy
case (unhealthy, Healthy) => unhealthy
case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") // TODO do we want to combine reasons like that...?
}

private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
*/
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.Functor
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import com.snowplowanalytics.iglu.core.SchemaCriterion
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, SnowflakeHealth, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import io.sentry.Sentry
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client

case class Environment[F[_]](
appInfo: AppInfo,
Expand All @@ -28,6 +27,7 @@ case class Environment[F[_]](
tblManager: TableManager[F],
channelProvider: ChannelProvider[F],
metrics: Metrics[F],
snowflakeRunner: SnowflakeActionRunner[F],
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion]
)
Expand All @@ -41,16 +41,21 @@ object Environment {
toSink: SinkConfig => Resource[F, Sink[F]]
): Resource[F, Environment[F]] =
for {
snowflakeHealth <- Resource.eval(SnowflakeHealth.initHealthy[F])
snowflakeRunner = SnowflakeActionRunner.retryingIndefinitely[F](snowflakeHealth)
_ <- enableSentry[F](appInfo, config.monitoring.sentry)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(SQLUtils.createTable(config.output.good, xa))
_ <- Resource.eval(snowflakeRunner.run(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, xa)
channelProvider <- ChannelProvider.make(config.output.good, config.batching)
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, isHealthy(config.monitoring.healthProbe, sourceAndAck))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth)
)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
Expand All @@ -59,6 +64,7 @@ object Environment {
tblManager = tblManager,
channelProvider = channelProvider,
metrics = metrics,
snowflakeRunner = snowflakeRunner,
batching = config.batching,
schemasToSkip = config.skipSchemas
)
Expand All @@ -83,13 +89,4 @@ object Environment {
case None =>
Resource.unit[F]
}

private def isHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.processingLatency.map { latency =>
if (latency > config.unhealthyLatency)
HealthProbe.Unhealthy(show"Processing latency is $latency")
else
HealthProbe.Healthy
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ object Processing {
batch.pure[F]
else
Sync[F].untilDefinedM {
env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2)).flatMap {
env.snowflakeRunner.run(env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2))).flatMap {
case ChannelProvider.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
env.channelProvider.reset.as(none)
Expand Down Expand Up @@ -339,7 +339,7 @@ object Processing {
().pure[F]
else
env.channelProvider.withClosedChannel {
env.tblManager.addColumns(extraColsRequired.toList)
env.snowflakeRunner.run(env.tblManager.addColumns(extraColsRequired.toList))
}

private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.Sync
import cats.implicits._
import net.snowflake.client.jdbc.SnowflakeSQLException
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry._
import retry.implicits.retrySyntaxError

import scala.concurrent.duration.DurationInt

trait SnowflakeActionRunner[F[_]] {
def run[A](action: F[A]): F[A]
}

object SnowflakeActionRunner {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def retryingIndefinitely[F[_]: Sync: Sleep](snowflakeHealth: SnowflakeHealth[F]): SnowflakeActionRunner[F] =
new SnowflakeActionRunner[F] {
override def run[A](action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, action) <*
snowflakeHealth.setHealthy()
}

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], action: F[A]) =
action.retryingOnSomeErrors(
isWorthRetrying = isWorthRetrying(_).pure[F],
policy = RetryPolicies.exponentialBackoff[F](baseDelay = 1.minute), // TODO make it configurable
onError = (ex, _) => handleError(snowflakeHealth, ex)
)

private def handleError[F[_]: Sync](snowflakeHealth: SnowflakeHealth[F], ex: Throwable): F[Unit] =
snowflakeHealth.setUnhealthy() *>
Logger[F].error(ex)("Executing Snowflake command failed")

private def isWorthRetrying(error: Throwable): Boolean = error match {
// TODO Retry for some specific error codes or simply for any snowflake exception?
case se: SnowflakeSQLException if se.getErrorCode === 2222 =>
true
case _ => true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.{Concurrent, Ref}
import cats.implicits._
import com.snowplowanalytics.snowplow.runtime.HealthProbe

final case class SnowflakeHealth[F[_]](state: Ref[F, HealthProbe.Status]) {
def setUnhealthy(): F[Unit] = state.set(HealthProbe.Unhealthy("Snowflake is not healthy")) // TODO do we need more details here?
def setHealthy(): F[Unit] = state.set(HealthProbe.Healthy)
}

object SnowflakeHealth {
def initHealthy[F[_]: Concurrent]: F[SnowflakeHealth[F]] =
Ref
.of[F, HealthProbe.Status](HealthProbe.Healthy)
.map(SnowflakeHealth.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import fs2.Stream

import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, TableManager}
import com.snowplowanalytics.snowplow.runtime.AppInfo

import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
Expand Down Expand Up @@ -60,6 +60,7 @@ object MockEnvironment {
tblManager = testTableManager(state),
channelProvider = channelProvider,
metrics = testMetrics(state),
snowflakeRunner = testSnowflakeRunner(),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 10.seconds,
Expand Down Expand Up @@ -158,4 +159,8 @@ object MockEnvironment {

def report: Stream[IO, Nothing] = Stream.never[IO]
}

private def testSnowflakeRunner() = new SnowflakeActionRunner[IO] {
override def run[A](action: IO[A]): IO[A] = action
}
}

0 comments on commit b8d60aa

Please sign in to comment.