diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index f936398..502225d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -13,7 +13,7 @@ import cats.implicits._ import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, SnowflakeHealth, TableManager} +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager} import com.snowplowanalytics.snowplow.sources.SourceAndAck import io.sentry.Sentry import org.http4s.blaze.client.BlazeClientBuilder @@ -27,7 +27,6 @@ case class Environment[F[_]]( tblManager: TableManager[F], channelProvider: ChannelProvider[F], metrics: Metrics[F], - snowflakeRunner: SnowflakeActionRunner[F], batching: Config.Batching, schemasToSkip: List[SchemaCriterion] ) @@ -42,15 +41,14 @@ object Environment { ): Resource[F, Environment[F]] = for { snowflakeHealth <- Resource.eval(SnowflakeHealth.initHealthy[F]) - snowflakeRunner = SnowflakeActionRunner.retryingIndefinitely[F](snowflakeHealth) _ <- enableSentry[F](appInfo, config.monitoring.sentry) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) - _ <- Resource.eval(snowflakeRunner.run(SQLUtils.createTable(config.output.good, xa))) - tblManager = TableManager.fromTransactor(config.output.good, xa) - channelProvider <- ChannelProvider.make(config.output.good, config.batching) + _ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(SQLUtils.createTable(config.output.good, xa))) + tblManager = TableManager.fromTransactor(config.output.good, snowflakeHealth, xa) + channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching) sourceAndAck <- Resource.eval(toSource(config.input)) _ <- HealthProbe.resource( config.monitoring.healthProbe.port, @@ -64,7 +62,6 @@ object Environment { tblManager = tblManager, channelProvider = channelProvider, metrics = metrics, - snowflakeRunner = snowflakeRunner, batching = config.batching, schemasToSkip = config.skipSchemas ) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala index 46fff90..7313953 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala @@ -107,20 +107,25 @@ object ChannelProvider { /** A large number so we don't limit the number of permits for calls to `flush` and `enqueue` */ private val allAvailablePermits: Long = Long.MaxValue - def make[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, ChannelProvider[F]] = + def make[F[_]: Async]( + config: Config.Snowflake, + snowflakeHealth: SnowflakeHealth[F], + batchingConfig: Config.Batching + ): Resource[F, ChannelProvider[F]] = for { client <- createClient(config, batchingConfig) hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel] channel <- Resource.eval(hs.swap(createChannel(config, client))) ref <- Resource.eval(Ref[F].of(channel)) sem <- Resource.eval(Semaphore[F](allAvailablePermits)) - } yield impl(ref, hs, sem, createChannel(config, client)) + } yield impl(ref, hs, sem, createChannel(config, client), snowflakeHealth) private def impl[F[_]: Async]( ref: Ref[F, SnowflakeStreamingIngestChannel], hs: Hotswap[F, SnowflakeStreamingIngestChannel], sem: Semaphore[F], - next: Resource[F, SnowflakeStreamingIngestChannel] + next: Resource[F, SnowflakeStreamingIngestChannel], + snowflakeHealth: SnowflakeHealth[F] ): ChannelProvider[F] = new ChannelProvider[F] { def reset: F[Unit] = @@ -158,7 +163,8 @@ object ChannelProvider { .use[WriteResult] { _ => for { channel <- ref.get - response <- Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null)) + response <- + SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null))) _ <- flushChannel[F](channel) isValid <- Sync[F].delay(channel.isValid) } yield if (isValid) WriteResult.WriteFailures(parseResponse(response)) else WriteResult.ChannelIsInvalid diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index a44b828..ca795e3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -223,7 +223,7 @@ object Processing { batch.pure[F] else Sync[F].untilDefinedM { - env.snowflakeRunner.run(env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2))).flatMap { + env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2)).flatMap { case ChannelProvider.WriteResult.ChannelIsInvalid => // Reset the channel and immediately try again env.channelProvider.reset.as(none) @@ -339,7 +339,7 @@ object Processing { ().pure[F] else env.channelProvider.withClosedChannel { - env.snowflakeRunner.run(env.tblManager.addColumns(extraColsRequired.toList)) + env.tblManager.addColumns(extraColsRequired.toList) } private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala similarity index 76% rename from modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala rename to modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala index 4ccc716..a7564f3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeActionRunner.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala @@ -10,20 +10,13 @@ import retry.implicits.retrySyntaxError import scala.concurrent.duration.DurationInt -trait SnowflakeActionRunner[F[_]] { - def run[A](action: F[A]): F[A] -} - -object SnowflakeActionRunner { +object SnowflakeRetrying { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def retryingIndefinitely[F[_]: Sync: Sleep](snowflakeHealth: SnowflakeHealth[F]): SnowflakeActionRunner[F] = - new SnowflakeActionRunner[F] { - override def run[A](action: F[A]): F[A] = - retryUntilSuccessful(snowflakeHealth, action) <* - snowflakeHealth.setHealthy() - } + def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F])(action: F[A]): F[A] = + retryUntilSuccessful(snowflakeHealth, action) <* + snowflakeHealth.setHealthy() private def retryUntilSuccessful[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], action: F[A]) = action.retryingOnSomeErrors( @@ -42,4 +35,5 @@ object SnowflakeActionRunner { true case _ => true } + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index 4372703..0d08e7d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -31,10 +31,11 @@ object TableManager { def fromTransactor[F[_]: Async]( config: Config.Snowflake, + snowflakeHealth: SnowflakeHealth[F], xa: Transactor[F] ): TableManager[F] = new TableManager[F] { - def addColumns(columns: List[String]): F[Unit] = + def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth) { Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> xa.rawTrans.apply { columns.traverse_ { col => @@ -45,6 +46,7 @@ object TableManager { } } } + } } private val reUnstruct: Regex = "^unstruct_event_.*$".r diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index c7131b0..989edbf 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -14,7 +14,7 @@ import fs2.Stream import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, TableManager} +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} import com.snowplowanalytics.snowplow.runtime.AppInfo import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} @@ -60,7 +60,6 @@ object MockEnvironment { tblManager = testTableManager(state), channelProvider = channelProvider, metrics = testMetrics(state), - snowflakeRunner = testSnowflakeRunner(), batching = Config.Batching( maxBytes = 16000000, maxDelay = 10.seconds, @@ -159,8 +158,4 @@ object MockEnvironment { def report: Stream[IO, Nothing] = Stream.never[IO] } - - private def testSnowflakeRunner() = new SnowflakeActionRunner[IO] { - override def run[A](action: IO[A]): IO[A] = action - } }