Skip to content

Commit

Permalink
Draft Monitoring trait
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 13, 2023
1 parent ece3d95 commit 78aea99
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import cats.effect.{Async, Resource}
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager}
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import io.sentry.Sentry
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client

Expand All @@ -24,11 +22,12 @@ case class Environment[F[_]](
source: SourceAndAck[F],
badSink: Sink[F],
httpClient: Client[F],
tblManager: TableManager[F],
tableManager: TableManager[F],
channelProvider: ChannelProvider[F],
metrics: Metrics[F],
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion]
schemasToSkip: List[SchemaCriterion],
monitoring: Monitoring[F]
)

object Environment {
Expand All @@ -46,45 +45,22 @@ object Environment {
config.monitoring.healthProbe.port,
AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth)
)
_ <- enableSentry[F](appInfo, config.monitoring.sentry)
monitoring <- Monitoring.create[F](appInfo, config.monitoring)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries)
tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring))
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries)

} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tblManager = tblManager,
tableManager = tableManager,
channelProvider = channelProvider,
metrics = metrics,
batching = config.batching,
schemasToSkip = config.skipSchemas
schemasToSkip = config.skipSchemas,
monitoring = monitoring
)

private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
config match {
case Some(c) =>
val acquire = Sync[F].delay {
Sentry.init { options =>
options.setDsn(c.dsn)
options.setRelease(appInfo.version)
c.tags.foreach { case (k, v) =>
options.setTag(k, v)
}
}
}

Resource.makeCase(acquire) {
case (_, Resource.ExitCase.Errored(e)) => Sync[F].delay(Sentry.captureException(e)).void
case _ => Sync[F].unit
}
case None =>
Resource.unit[F]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.Resource
import cats.effect.kernel.Sync
import cats.implicits._
import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.sentry.{Sentry, SentryOptions}

trait Monitoring[F[_]] {
def alert(message: Monitoring.Alert): F[Unit]
def reportFatal(exception: Throwable): F[Unit]
}

object Monitoring {

sealed trait Alert

object Alert {
final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
// TODO add more!

}

def create[F[_]: Sync](appInfo: AppInfo, config: Config.Monitoring): Resource[F, Monitoring[F]] =
initSentry(appInfo, config.sentry).map { _ =>
new Monitoring[F] {
override def alert(message: Alert): F[Unit] =
Sync[F].unit // TODO

override def reportFatal(exception: Throwable): F[Unit] =
config.sentry.fold(Sync[F].unit) { _ =>
Sync[F].delay(Sentry.captureException(exception)).void
}

}
}

private def initSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
config match {
case Some(sentryConfig) =>
Resource.make {
Sync[F].delay(Sentry.init(createSentryOptions(appInfo, sentryConfig)))
} { _ =>
Sync[F].delay(Sentry.close())
}
case None =>
Resource.unit[F]
}

private def createSentryOptions(appInfo: AppInfo, sentryConfig: Config.Sentry): SentryOptions = {
val options = new SentryOptions
options.setDsn(sentryConfig.dsn)
options.setRelease(appInfo.version)
sentryConfig.tags.foreach { case (k, v) =>
options.setTag(k, v)
}
options
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ object Run {
.compile
.drain
.as(ExitCode.Success)
.onError { fatalAppError =>
env.monitoring.reportFatal(fatalAppError)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.{Async, Sync}
import cats.implicits._
import doobie.Transactor
import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils}

import java.security.PrivateKey
import java.util.Properties

object JdbcTransactor {

private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver"

def make[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] =
for {
privateKey <- parsePrivateKey[F](config)
props = jdbcProperties(config, privateKey)
} yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None)

private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] =
Sync[F].delay { // Wrap in Sync because these can raise exceptions
config.privateKeyPassphrase match {
case Some(passphrase) =>
SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase)
case None =>
SnowflakeSdkUtils.parsePrivateKey(config.privateKey)
}
}

private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = {
val props = new Properties()
props.setProperty("user", config.user)
props.put("privateKey", privateKey)
props.setProperty("timezone", "UTC")
config.role.foreach(props.setProperty("role", _))
props.put("loginTimeout", config.jdbcLoginTimeout.toSeconds.toInt)
props.put("networkTimeout", config.jdbcNetworkTimeout.toMillis.toInt)
props.put("queryTimeout", config.jdbcQueryTimeout.toSeconds.toInt)
props
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime

import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
Expand All @@ -37,7 +36,9 @@ object Processing {

def stream[F[_]: Async](env: Environment[F]): Stream[F, Nothing] = {
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing)
env.source.stream(eventProcessingConfig, eventProcessor(env))

Stream.eval(env.tableManager.initializeEventsTable()) >>
env.source.stream(eventProcessingConfig, eventProcessor(env))
}

/** Model used between stages of the processing pipeline */
Expand Down Expand Up @@ -339,7 +340,7 @@ object Processing {
().pure[F]
else
env.channelProvider.withClosedChannel {
env.tblManager.addColumns(extraColsRequired.toList)
env.tableManager.addColumns(extraColsRequired.toList)
}

private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
Expand Down
Loading

0 comments on commit 78aea99

Please sign in to comment.