Skip to content

Commit

Permalink
Extend app health with bad sink
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jan 9, 2024
1 parent 8127480 commit 17e42ca
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.{Concurrent, Ref}
import cats.implicits._
import cats.{Functor, Monad, Monoid}
import cats.{Monad, Monoid, Show}
import com.snowplowanalytics.snowplow.runtime.HealthProbe
import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy}
import com.snowplowanalytics.snowplow.snowflake.processing.SnowflakeHealth
import com.snowplowanalytics.snowplow.sources.SourceAndAck

object AppHealth {
import scala.concurrent.duration.FiniteDuration

def isHealthy[F[_]: Monad](
config: Config.HealthProbe,
source: SourceAndAck[F],
snowflakeHealth: SnowflakeHealth[F]
): F[HealthProbe.Status] =
List(
sourceIsHealthy(config, source),
snowflakeHealth.state.get
).foldA

private def sourceIsHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.isHealthy(config.unhealthyLatency).map {
final class AppHealth[F[_]: Monad](
unhealthyLatency: FiniteDuration,
source: SourceAndAck[F],
appManagedServices: Ref[F, Map[AppHealth.Service, Boolean]]
) {

def status(): F[HealthProbe.Status] =
for {
sourceHealth <- getSourceHealth
servicesHealth <- getAppManagedServicesHealth
} yield (sourceHealth :: servicesHealth).combineAll

def setServiceHealth(service: AppHealth.Service, isHealthy: Boolean): F[Unit] =
appManagedServices.update { currentHealth =>
currentHealth.updated(service, isHealthy)
}

private def getAppManagedServicesHealth: F[List[HealthProbe.Status]] =
appManagedServices.get.map { services =>
services.map {
case (service, false) => HealthProbe.Unhealthy(show"$service is not healthy")
case _ => HealthProbe.Healthy
}.toList
}

private def getSourceHealth: F[HealthProbe.Status] =
source.isHealthy(unhealthyLatency).map {
case SourceAndAck.Healthy => HealthProbe.Healthy
case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show)
}
Expand All @@ -34,3 +49,27 @@ object AppHealth {

private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth)
}

object AppHealth {

sealed trait Service

object Service {
case object Snowflake extends Service
case object BadSink extends Service

implicit val show: Show[Service] = Show.show {
case Snowflake => "Snowflake"
case BadSink => "Bad sink"
}
}

def init[F[_]: Concurrent](
unhealthyLatency: FiniteDuration,
source: SourceAndAck[F],
initialHealth: Map[AppHealth.Service, Boolean]
): F[AppHealth[F]] =
Ref
.of[F, Map[AppHealth.Service, Boolean]](initialHealth)
.map(appManaged => new AppHealth[F](unhealthyLatency, source, appManaged))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import cats.effect.{Async, Resource}
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, SnowflakeHealth, TableManager}
import com.snowplowanalytics.snowplow.snowflake.AppHealth.Service
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
Expand All @@ -25,13 +26,19 @@ case class Environment[F[_]](
tableManager: TableManager[F],
channel: Channel.Provider[F],
metrics: Metrics[F],
appHealth: AppHealth[F],
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion],
badRowMaxSize: Int
)

object Environment {

private val initialAppHealth: Map[Service, Boolean] = Map(
Service.Snowflake -> false,
Service.BadSink -> true
)

def fromConfig[F[_]: Async, SourceConfig, SinkConfig](
config: Config[SourceConfig, SinkConfig],
appInfo: AppInfo,
Expand All @@ -40,19 +47,19 @@ object Environment {
): Resource[F, Environment[F]] =
for {
_ <- Sentry.capturingAnyException(appInfo, config.monitoring.sentry)
snowflakeHealth <- Resource.eval(SnowflakeHealth.initUnhealthy[F])
sourceAndAck <- Resource.eval(toSource(config.input))
appHealth <- Resource.eval(AppHealth.init(config.monitoring.healthProbe.unhealthyLatency, sourceAndAck, initialAppHealth))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth)
appHealth.status()
)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient)
badSink <- toSink(config.output.bad.sink)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries, monitoring))
channelOpener <- Channel.opener(config.output.good, config.batching)
channelProvider <- Channel.provider(channelOpener, config.retries, snowflakeHealth, monitoring)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth, monitoring)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
Expand All @@ -61,6 +68,7 @@ object Environment {
tableManager = tableManager,
channel = channelProvider,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.{Async, Poll, Resource, Sync}
import cats.implicits._
import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, Monitoring}
import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring}
import net.snowflake.ingest.streaming.internal.SnowsFlakePlowInterop
import net.snowflake.ingest.streaming._
import net.snowflake.ingest.utils.{ErrorCode => SFErrorCode, ParameterProvider, SFException}
Expand Down Expand Up @@ -105,15 +105,15 @@ object Channel {
def provider[F[_]: Async](
opener: Opener[F],
retries: Config.Retries,
health: SnowflakeHealth[F],
health: AppHealth[F],
monitoring: Monitoring[F]
): Resource[F, Provider[F]] =
Coldswap.make(openerToResource(opener, retries, health, monitoring))

private def openerToResource[F[_]: Async](
opener: Opener[F],
retries: Config.Retries,
health: SnowflakeHealth[F],
health: AppHealth[F],
monitoring: Monitoring[F]
): Resource[F, Channel[F]] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Pr
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics}
import com.snowplowanalytics.snowplow.snowflake.{AppHealth, Environment, Metrics}
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, Transform}
Expand Down Expand Up @@ -216,23 +216,30 @@ object Processing {
batch: BatchAfterTransform
)(
handleFailures: List[Channel.WriteFailure] => F[BatchAfterTransform]
): F[BatchAfterTransform] =
if (batch.toBeInserted.isEmpty)
batch.pure[F]
else
Sync[F].untilDefinedM {
env.channel.opened
.use { channel =>
channel.write(batch.toBeInserted.asIterable.map(_._2))
}
.flatMap {
case Channel.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
env.channel.closed.use_.as(none)
case Channel.WriteResult.WriteFailures(notWritten) =>
handleFailures(notWritten).map(Some(_))
}
): F[BatchAfterTransform] = {
val attempt: F[BatchAfterTransform] =
if (batch.toBeInserted.isEmpty)
batch.pure[F]
else
Sync[F].untilDefinedM {
env.channel.opened
.use { channel =>
channel.write(batch.toBeInserted.asIterable.map(_._2))
}
.flatMap {
case Channel.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
env.channel.closed.use_.as(none)
case Channel.WriteResult.WriteFailures(notWritten) =>
handleFailures(notWritten).map(Some(_))
}
}

attempt
.onError { _ =>
env.appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false)
}
}

/**
* First attempt to write events with the Snowflake SDK
Expand Down Expand Up @@ -344,15 +351,19 @@ object Processing {
env.tableManager.addColumns(extraColsRequired.toList)
}

private def sendFailedEvents[F[_]: Applicative](
private def sendFailedEvents[F[_]: Sync](
env: Environment[F],
badRowProcessor: BadRowProcessor
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
if (batch.badAccumulated.nonEmpty) {
val serialized =
batch.badAccumulated.mapUnordered(badRow => BadRowsSerializer.withMaxSize(badRow, badRowProcessor, env.badRowMaxSize))
env.badSink.sinkSimple(serialized)
env.badSink
.sinkSimple(serialized)
.onError { _ =>
env.appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = false)
}
} else Applicative[F].unit
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.snowplow.snowflake.Config
import com.snowplowanalytics.snowplow.snowflake.{AppHealth, Config}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry._
Expand All @@ -12,17 +12,17 @@ object SnowflakeRetrying {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def retryIndefinitely[F[_]: Sync: Sleep, A](snowflakeHealth: SnowflakeHealth[F], config: Config.Retries)(action: F[A]): F[A] =
retryUntilSuccessful(snowflakeHealth, config, action) <*
snowflakeHealth.setHealthy()
def retryIndefinitely[F[_]: Sync: Sleep, A](appHealth: AppHealth[F], config: Config.Retries)(action: F[A]): F[A] =
retryUntilSuccessful(appHealth, config, action) <*
appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = true)

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](
snowflakeHealth: SnowflakeHealth[F],
appHealth: AppHealth[F],
config: Config.Retries,
action: F[A]
): F[A] =
action
.onError(_ => snowflakeHealth.setUnhealthy())
.onError(_ => appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false))
.retryingOnAllErrors(
policy = RetryPolicies.exponentialBackoff[F](config.backoff),
onError = (error, details) => Logger[F].error(error)(s"Executing Snowflake command failed. ${extractRetryDetails(details)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.{Async, Sync}
import cats.implicits._
import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, JdbcTransactor, Monitoring}
import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, JdbcTransactor, Monitoring}
import doobie.implicits._
import doobie.{ConnectionIO, Fragment}
import net.snowflake.client.jdbc.SnowflakeSQLException
Expand All @@ -32,15 +32,15 @@ object TableManager {

def make[F[_]: Async](
config: Config.Snowflake,
snowflakeHealth: SnowflakeHealth[F],
appHealth: AppHealth[F],
retriesConfig: Config.Retries,
monitoring: Monitoring[F]
): F[TableManager[F]] =
JdbcTransactor.make(config).map { transactor =>
new TableManager[F] {

override def initializeEventsTable(): F[Unit] =
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
SnowflakeRetrying.retryIndefinitely(appHealth, retriesConfig) {
Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *>
executeInitTableQuery()
.onError { cause =>
Expand All @@ -49,7 +49,7 @@ object TableManager {
}

override def addColumns(columns: List[String]): F[Unit] =
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
SnowflakeRetrying.retryIndefinitely(appHealth, retriesConfig) {
Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *>
executeAddColumnsQuery(columns)
.onError { cause =>
Expand Down
Loading

0 comments on commit 17e42ca

Please sign in to comment.