Skip to content

Commit

Permalink
Implement alerting using webhook (close #12)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 20, 2023
1 parent ece3d95 commit d149bd6
Show file tree
Hide file tree
Showing 16 changed files with 595 additions and 278 deletions.
10 changes: 10 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@
"myTag": "xyz"
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
}

# -- Optional, configure telemetry
Expand Down
10 changes: 10 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@
"myTag": "xyz"
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
}

# -- Optional, configure telemetry
Expand Down
10 changes: 10 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@
"myTag": "xyz"
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
}

# -- Optional, configure telemetry
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.Show
import cats.implicits.showInterpolator
import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.circe.Json
import io.circe.syntax.EncoderOps

import java.sql.SQLException

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert

def toSelfDescribingJson(
alert: Alert,
appInfo: AppInfo,
tags: Map[String, String]
): Json =
SelfDescribingData(
schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)),
data = Json.obj(
"application" -> s"${appInfo.name}-${appInfo.version}".asJson,
"message" -> getMessage(alert).asJson,
"tags" -> tags.asJson
)
).normalize

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
}

full.take(MaxAlertPayloadLength)
}

private implicit def throwableShow: Show[Throwable] = {
def go(acc: List[String], next: Throwable): String = {
val nextMessage = next match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc

Option(next.getCause) match {
case Some(cause) => go(msgs, cause)
case None => msgs.reverse.mkString(": ")
}
}

Show.show(go(Nil, _))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.Id
import cats.syntax.either._
import io.circe.Decoder
import io.circe.generic.extras.semiauto._
import io.circe.generic.extras.Configuration
Expand All @@ -21,6 +22,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._
import org.http4s.{ParseFailure, Uri}

case class Config[+Source, +Sink](
input: Source,
Expand Down Expand Up @@ -73,9 +75,12 @@ object Config {
case class Monitoring(
metrics: Metrics,
sentry: Option[Sentry],
healthProbe: HealthProbe
healthProbe: HealthProbe,
webhook: Option[Webhook]
)

final case class Webhook(endpoint: Uri, tags: Map[String, String])

case class Retries(backoff: FiniteDuration)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand All @@ -93,8 +98,12 @@ object Config {
case SentryM(None, _) =>
None
}
implicit val http4sUriDecoder: Decoder[Uri] =
Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString))

implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val webhookDecoder = deriveConfiguredDecoder[Webhook]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
deriveConfiguredDecoder[Config[Source, Sink]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import cats.effect.{Async, Resource}
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager}
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import io.sentry.Sentry
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client

Expand All @@ -24,7 +22,7 @@ case class Environment[F[_]](
source: SourceAndAck[F],
badSink: Sink[F],
httpClient: Client[F],
tblManager: TableManager[F],
tableManager: TableManager[F],
channelProvider: ChannelProvider[F],
metrics: Metrics[F],
batching: Config.Batching,
Expand All @@ -40,51 +38,29 @@ object Environment {
toSink: SinkConfig => Resource[F, Sink[F]]
): Resource[F, Environment[F]] =
for {
_ <- Sentry.capturingAnyException(appInfo, config.monitoring.sentry)
snowflakeHealth <- Resource.eval(SnowflakeHealth.initUnhealthy[F])
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth)
)
_ <- enableSentry[F](appInfo, config.monitoring.sentry)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient)
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa)))
tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries)
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries)

tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring))
_ <- Resource.eval(tableManager.initializeEventsTable())
channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries, monitoring)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tblManager = tblManager,
tableManager = tableManager,
channelProvider = channelProvider,
metrics = metrics,
batching = config.batching,
schemasToSkip = config.skipSchemas
)

private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
config match {
case Some(c) =>
val acquire = Sync[F].delay {
Sentry.init { options =>
options.setDsn(c.dsn)
options.setRelease(appInfo.version)
c.tags.foreach { case (k, v) =>
options.setTag(k, v)
}
}
}

Resource.makeCase(acquire) {
case (_, Resource.ExitCase.Errored(e)) => Sync[F].delay(Sentry.captureException(e)).void
case _ => Sync[F].unit
}
case None =>
Resource.unit[F]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.{Resource, Sync}
import cats.implicits._
import com.snowplowanalytics.snowplow.runtime.AppInfo
import org.http4s.circe.jsonEncoder
import org.http4s.client.Client
import org.http4s.{EntityDecoder, Method, Request}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

trait Monitoring[F[_]] {
def alert(message: Alert): F[Unit]
}

object Monitoring {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def create[F[_]: Sync](
config: Option[Config.Webhook],
appInfo: AppInfo,
httpClient: Client[F]
)(implicit E: EntityDecoder[F, String]
): Resource[F, Monitoring[F]] = Resource.pure {
new Monitoring[F] {

override def alert(message: Alert): F[Unit] =
config match {
case Some(webhookConfig) =>
val request = buildHttpRequest(webhookConfig, message)
executeHttpRequest(webhookConfig, httpClient, request)
case None =>
Logger[F].debug("Webhook monitoring is not configured, skipping alert")
}

def buildHttpRequest(webhookConfig: Config.Webhook, alert: Alert): Request[F] =
Request[F](Method.POST, webhookConfig.endpoint)
.withEntity(Alert.toSelfDescribingJson(alert, appInfo, webhookConfig.tags))

def executeHttpRequest(
webhookConfig: Config.Webhook,
httpClient: Client[F],
request: Request[F]
): F[Unit] =
httpClient
.run(request)
.use { response =>
if (response.status.isSuccess) Sync[F].unit
else {
response.as[String].flatMap(body => Logger[F].error(s"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body"))
}
}
.handleErrorWith { e =>
Logger[F].error(e)(s"Webhook ${webhookConfig.endpoint} resulted in exception without a response")
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.{Resource, Sync}
import cats.implicits.catsSyntaxApplyOps
import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.sentry.{Sentry => JSentry, SentryOptions}

object Sentry {

def capturingAnyException[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
config match {
case Some(sentryConfig) =>
initSentry(appInfo, sentryConfig)
case None =>
Resource.unit[F]
}

private def initSentry[F[_]: Sync](appInfo: AppInfo, sentryConfig: Config.Sentry): Resource[F, Unit] = {
val acquire = Sync[F].delay(JSentry.init(createSentryOptions(appInfo, sentryConfig)))
val release = Sync[F].delay(JSentry.close())

Resource.makeCase(acquire) {
case (_, Resource.ExitCase.Errored(e)) => Sync[F].delay(JSentry.captureException(e)) *> release
case _ => release

}
}

private def createSentryOptions(appInfo: AppInfo, sentryConfig: Config.Sentry): SentryOptions = {
val options = new SentryOptions
options.setDsn(sentryConfig.dsn)
options.setRelease(appInfo.version)
sentryConfig.tags.foreach { case (k, v) =>
options.setTag(k, v)
}
options
}
}
Loading

0 comments on commit d149bd6

Please sign in to comment.