Skip to content

Commit

Permalink
Alternative approach!
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 4, 2023
1 parent b8d60aa commit 6ebcb9f
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import cats.implicits._
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, SnowflakeHealth, TableManager}
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import io.sentry.Sentry
import org.http4s.blaze.client.BlazeClientBuilder
Expand All @@ -27,7 +27,6 @@ case class Environment[F[_]](
tblManager: TableManager[F],
channelProvider: ChannelProvider[F],
metrics: Metrics[F],
snowflakeRunner: SnowflakeActionRunner[F],
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion]
)
Expand All @@ -42,15 +41,14 @@ object Environment {
): Resource[F, Environment[F]] =
for {
snowflakeHealth <- Resource.eval(SnowflakeHealth.initHealthy[F])
snowflakeRunner = SnowflakeActionRunner.retryingIndefinitely[F](snowflakeHealth)
_ <- enableSentry[F](appInfo, config.monitoring.sentry)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(snowflakeRunner.run(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, xa)
channelProvider <- ChannelProvider.make(config.output.good, config.batching)
_ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, snowflakeHealth, xa)
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching)
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
Expand All @@ -64,7 +62,6 @@ object Environment {
tblManager = tblManager,
channelProvider = channelProvider,
metrics = metrics,
snowflakeRunner = snowflakeRunner,
batching = config.batching,
schemasToSkip = config.skipSchemas
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,25 @@ object ChannelProvider {
/** A large number so we don't limit the number of permits for calls to `flush` and `enqueue` */
private val allAvailablePermits: Long = Long.MaxValue

def make[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, ChannelProvider[F]] =
def make[F[_]: Async](
config: Config.Snowflake,
snowflakeHealth: SnowflakeHealth[F],
batchingConfig: Config.Batching
): Resource[F, ChannelProvider[F]] =
for {
client <- createClient(config, batchingConfig)
hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel]
channel <- Resource.eval(hs.swap(createChannel(config, client)))
ref <- Resource.eval(Ref[F].of(channel))
sem <- Resource.eval(Semaphore[F](allAvailablePermits))
} yield impl(ref, hs, sem, createChannel(config, client))
} yield impl(ref, hs, sem, createChannel(config, client), snowflakeHealth)

private def impl[F[_]: Async](
ref: Ref[F, SnowflakeStreamingIngestChannel],
hs: Hotswap[F, SnowflakeStreamingIngestChannel],
sem: Semaphore[F],
next: Resource[F, SnowflakeStreamingIngestChannel]
next: Resource[F, SnowflakeStreamingIngestChannel],
snowflakeHealth: SnowflakeHealth[F]
): ChannelProvider[F] =
new ChannelProvider[F] {
def reset: F[Unit] =
Expand Down Expand Up @@ -158,7 +163,8 @@ object ChannelProvider {
.use[WriteResult] { _ =>
for {
channel <- ref.get
response <- Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null))
response <-
SnowflakeRetrying.retryIndefinitely(snowflakeHealth)(Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null)))
_ <- flushChannel[F](channel)
isValid <- Sync[F].delay(channel.isValid)
} yield if (isValid) WriteResult.WriteFailures(parseResponse(response)) else WriteResult.ChannelIsInvalid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ object Processing {
batch.pure[F]
else
Sync[F].untilDefinedM {
env.snowflakeRunner.run(env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2))).flatMap {
env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2)).flatMap {
case ChannelProvider.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
env.channelProvider.reset.as(none)
Expand Down Expand Up @@ -339,7 +339,7 @@ object Processing {
().pure[F]
else
env.channelProvider.withClosedChannel {
env.snowflakeRunner.run(env.tblManager.addColumns(extraColsRequired.toList))
env.tblManager.addColumns(extraColsRequired.toList)
}

private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,13 @@ import retry.implicits.retrySyntaxError

import scala.concurrent.duration.DurationInt

trait SnowflakeActionRunner[F[_]] {
def run[A](action: F[A]): F[A]
}

object SnowflakeActionRunner {
object SnowflakeRetrying {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def retryingIndefinitely[F[_]: Sync: Sleep](snowflakeHealth: SnowflakeHealth[F]): SnowflakeActionRunner[F] =
new SnowflakeActionRunner[F] {
override def run[A](action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, action) <*
snowflakeHealth.setHealthy()
}
def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F])(action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, action) <*
snowflakeHealth.setHealthy()

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], action: F[A]) =
action.retryingOnSomeErrors(
Expand All @@ -42,4 +35,5 @@ object SnowflakeActionRunner {
true
case _ => true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ object TableManager {

def fromTransactor[F[_]: Async](
config: Config.Snowflake,
snowflakeHealth: SnowflakeHealth[F],
xa: Transactor[F]
): TableManager[F] = new TableManager[F] {

def addColumns(columns: List[String]): F[Unit] =
def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth) {
Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *>
xa.rawTrans.apply {
columns.traverse_ { col =>
Expand All @@ -45,6 +46,7 @@ object TableManager {
}
}
}
}
}

private val reUnstruct: Regex = "^unstruct_event_.*$".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import fs2.Stream

import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeActionRunner, TableManager}
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.runtime.AppInfo

import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
Expand Down Expand Up @@ -60,7 +60,6 @@ object MockEnvironment {
tblManager = testTableManager(state),
channelProvider = channelProvider,
metrics = testMetrics(state),
snowflakeRunner = testSnowflakeRunner(),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 10.seconds,
Expand Down Expand Up @@ -159,8 +158,4 @@ object MockEnvironment {

def report: Stream[IO, Nothing] = Stream.never[IO]
}

private def testSnowflakeRunner() = new SnowflakeActionRunner[IO] {
override def run[A](action: IO[A]): IO[A] = action
}
}

0 comments on commit 6ebcb9f

Please sign in to comment.