diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index d71975f..2398c0d 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -144,6 +144,15 @@ "myTag": "xyz" } } + + "webhook": { + # An actual HTTP endpoint + "endpoint": "https://webhook.acme.com", + # Set of arbitrary key-value pairs attached to the payload + "tags": { + "pipeline": "production" + } + } } # -- Optional, configure telemetry diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala new file mode 100644 index 0000000..8b01e59 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala @@ -0,0 +1,63 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.Show +import cats.implicits.showInterpolator +import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.runtime.AppInfo +import io.circe.Json +import io.circe.syntax.EncoderOps + +import java.sql.SQLException + +sealed trait Alert +object Alert { + + /** Restrict the length of an alert message to be compliant with alert iglu schema */ + private val MaxAlertPayloadLength = 4096 // TODO increase? + + final case class FailedToCreateEventsTable(cause: Throwable) extends Alert + final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert + final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert + + def toSelfDescribingJson( + alert: Alert, + appInfo: AppInfo, + config: Config.Webhook + ): Json = + SelfDescribingData( + schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)), + data = Json.obj( + "application" -> s"${appInfo.name}-${appInfo.version}".asJson, + "message" -> getMessage(alert).asJson, + "tags" -> config.tags.asJson + ) + ).normalize + + private def getMessage(alert: Alert): String = { + val full = alert match { + case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause" + case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause" + case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause" + } + + full.take(MaxAlertPayloadLength) + } + + private implicit def throwableShow: Show[Throwable] = { + def go(acc: List[String], next: Throwable): String = { + val nextMessage = next match { + case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}") + case t => Option(t.getMessage) + } + val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc + + Option(next.getCause) match { + case Some(cause) => go(msgs, cause) + case None => msgs.reverse.mkString(": ") + } + } + + Show.show(go(Nil, _)) + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index fe5203c..11e25a0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -8,6 +8,7 @@ package com.snowplowanalytics.snowplow.snowflake import cats.Id +import cats.syntax.either._ import io.circe.Decoder import io.circe.generic.extras.semiauto._ import io.circe.generic.extras.Configuration @@ -19,8 +20,9 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDeco import scala.concurrent.duration.FiniteDuration import scala.util.Try -import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry} +import com.snowplowanalytics.snowplow.runtime.{Telemetry, Metrics => CommonMetrics} import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._ +import org.http4s.{ParseFailure, Uri} case class Config[+Source, +Sink]( input: Source, @@ -73,9 +75,12 @@ object Config { case class Monitoring( metrics: Metrics, sentry: Option[Sentry], - healthProbe: HealthProbe + healthProbe: HealthProbe, + webhook: Option[Webhook] ) + final case class Webhook(endpoint: Uri, tags: Map[String, String]) + case class Retries(backoff: FiniteDuration) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { @@ -93,8 +98,12 @@ object Config { case SentryM(None, _) => None } + implicit val http4sUriDecoder: Decoder[Uri] = + Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString)) + implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] + implicit val webhookDecoder = deriveConfiguredDecoder[Webhook] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] deriveConfiguredDecoder[Config[Source, Sink]] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index 4cca944..596804e 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -8,14 +8,12 @@ package com.snowplowanalytics.snowplow.snowflake import cats.effect.unsafe.implicits.global -import cats.effect.{Async, Resource, Sync} -import cats.implicits._ +import cats.effect.{Async, Resource} import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager} +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, TableManager} import com.snowplowanalytics.snowplow.sources.SourceAndAck -import io.sentry.Sentry import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client @@ -24,7 +22,7 @@ case class Environment[F[_]]( source: SourceAndAck[F], badSink: Sink[F], httpClient: Client[F], - tblManager: TableManager[F], + tableManager: TableManager[F], channelProvider: ChannelProvider[F], metrics: Metrics[F], batching: Config.Batching, @@ -40,51 +38,29 @@ object Environment { toSink: SinkConfig => Resource[F, Sink[F]] ): Resource[F, Environment[F]] = for { + _ <- Sentry.capturingAnyException(appInfo, config.monitoring.sentry) snowflakeHealth <- Resource.eval(SnowflakeHealth.initUnhealthy[F]) sourceAndAck <- Resource.eval(toSource(config.input)) _ <- HealthProbe.resource( config.monitoring.healthProbe.port, AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth) ) - _ <- enableSentry[F](appInfo, config.monitoring.sentry) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource + monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient) badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) - xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) - _ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa))) - tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries) - channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries) - + tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring)) + _ <- Resource.eval(tableManager.initializeEventsTable()) + channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries, monitoring) } yield Environment( appInfo = appInfo, source = sourceAndAck, badSink = badSink, httpClient = httpClient, - tblManager = tblManager, + tableManager = tableManager, channelProvider = channelProvider, metrics = metrics, batching = config.batching, schemasToSkip = config.skipSchemas ) - - private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] = - config match { - case Some(c) => - val acquire = Sync[F].delay { - Sentry.init { options => - options.setDsn(c.dsn) - options.setRelease(appInfo.version) - c.tags.foreach { case (k, v) => - options.setTag(k, v) - } - } - } - - Resource.makeCase(acquire) { - case (_, Resource.ExitCase.Errored(e)) => Sync[F].delay(Sentry.captureException(e)).void - case _ => Sync[F].unit - } - case None => - Resource.unit[F] - } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala new file mode 100644 index 0000000..57daa6d --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala @@ -0,0 +1,60 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.{Resource, Sync} +import cats.implicits._ +import com.snowplowanalytics.snowplow.runtime.AppInfo +import org.http4s.circe.jsonEncoder +import org.http4s.client.Client +import org.http4s.{EntityDecoder, Method, Request} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +trait Monitoring[F[_]] { + def alert(message: Alert): F[Unit] +} + +object Monitoring { + + private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + + def create[F[_]: Sync]( + config: Option[Config.Webhook], + appInfo: AppInfo, + httpClient: Client[F] + )(implicit E: EntityDecoder[F, String] + ): Resource[F, Monitoring[F]] = Resource.pure { + new Monitoring[F] { + + override def alert(message: Alert): F[Unit] = + config match { + case Some(webhookConfig) => + val request = buildHttpRequest(webhookConfig, message) + executeHttpRequest(webhookConfig, httpClient, request) + case None => + Logger[F].debug("Webhook monitoring is not configured, skipping alert") + } + + def buildHttpRequest(webhookConfig: Config.Webhook, alert: Alert): Request[F] = + Request[F](Method.POST, webhookConfig.endpoint) + .withEntity(Alert.toSelfDescribingJson(alert, appInfo, webhookConfig)) + + def executeHttpRequest( + webhookConfig: Config.Webhook, + httpClient: Client[F], + request: Request[F] + ): F[Unit] = + httpClient + .run(request) + .use { response => + if (response.status.isSuccess) Sync[F].unit + else { + response.as[String].flatMap(body => Logger[F].error(s"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body")) + } + } + .handleErrorWith { e => + Logger[F].error(e)(s"Webhook ${webhookConfig.endpoint} resulted in exception without a response") + } + } + } + +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Sentry.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Sentry.scala new file mode 100644 index 0000000..0016a7f --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Sentry.scala @@ -0,0 +1,38 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.{Resource, Sync} +import cats.implicits.catsSyntaxApplyOps +import com.snowplowanalytics.snowplow.runtime.AppInfo +import io.sentry.{Sentry => JSentry, SentryOptions} + +object Sentry { + + def capturingAnyException[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] = + config match { + case Some(sentryConfig) => + initSentry(appInfo, sentryConfig) + case None => + Resource.unit[F] + } + + private def initSentry[F[_]: Sync](appInfo: AppInfo, sentryConfig: Config.Sentry): Resource[F, Unit] = { + val acquire = Sync[F].delay(JSentry.init(createSentryOptions(appInfo, sentryConfig))) + val release = Sync[F].delay(JSentry.close()) + + Resource.makeCase(acquire) { + case (_, Resource.ExitCase.Errored(e)) => Sync[F].delay(JSentry.captureException(e)) *> release + case _ => release + + } + } + + private def createSentryOptions(appInfo: AppInfo, sentryConfig: Config.Sentry): SentryOptions = { + val options = new SentryOptions + options.setDsn(sentryConfig.dsn) + options.setRelease(appInfo.version) + sentryConfig.tags.foreach { case (k, v) => + options.setTag(k, v) + } + options + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala index 07da9b4..149fb91 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala @@ -7,26 +7,18 @@ */ package com.snowplowanalytics.snowplow.snowflake.processing -import cats.implicits._ import cats.effect.implicits._ -import cats.effect.{Async, Sync} import cats.effect.kernel.{Ref, Resource} import cats.effect.std.{Hotswap, Semaphore} +import cats.effect.{Async, Sync} +import cats.implicits._ +import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, Monitoring} +import net.snowflake.ingest.streaming.internal.SnowsFlakePlowInterop +import net.snowflake.ingest.streaming._ +import net.snowflake.ingest.utils.{ParameterProvider, SFException, ErrorCode => SFErrorCode} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import net.snowflake.ingest.streaming.{ - InsertValidationResponse, - OpenChannelRequest, - SnowflakeStreamingIngestChannel, - SnowflakeStreamingIngestClient, - SnowflakeStreamingIngestClientFactory -} -import net.snowflake.ingest.streaming.internal.SnowsFlakePlowInterop -import net.snowflake.ingest.utils.{ErrorCode => SFErrorCode, ParameterProvider, SFException} - -import com.snowplowanalytics.snowplow.snowflake.Config - import java.time.ZoneOffset import java.util.Properties import scala.jdk.CollectionConverters._ @@ -111,11 +103,12 @@ object ChannelProvider { config: Config.Snowflake, snowflakeHealth: SnowflakeHealth[F], batchingConfig: Config.Batching, - retriesConfig: Config.Retries + retriesConfig: Config.Retries, + monitoring: Monitoring[F] ): Resource[F, ChannelProvider[F]] = for { client <- createClient(config, batchingConfig) - channelResource = createChannel(config, client, snowflakeHealth, retriesConfig) + channelResource = createChannel(config, client, snowflakeHealth, retriesConfig, monitoring) (hs, channel) <- Hotswap.apply(channelResource) ref <- Resource.eval(Ref[F].of(channel)) sem <- Resource.eval(Semaphore[F](allAvailablePermits)) @@ -196,7 +189,8 @@ object ChannelProvider { config: Config.Snowflake, client: SnowflakeStreamingIngestClient, snowflakeHealth: SnowflakeHealth[F], - retriesConfig: Config.Retries + retriesConfig: Config.Retries, + monitoring: Monitoring[F] ): Resource[F, SnowflakeStreamingIngestChannel] = { val request = OpenChannelRequest .builder(config.channel) @@ -209,7 +203,11 @@ object ChannelProvider { val make = Logger[F].info(s"Opening channel ${config.channel}") *> SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { - Async[F].blocking(client.openChannel(request)) + Async[F] + .blocking(client.openChannel(request)) + .onError { cause => + monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause)) + } } Resource.make(make) { channel => diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala new file mode 100644 index 0000000..79572c0 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.{Async, Sync} +import cats.implicits._ +import doobie.Transactor +import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils} + +import java.security.PrivateKey +import java.util.Properties + +object JdbcTransactor { + + private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver" + + def make[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] = + for { + privateKey <- parsePrivateKey[F](config) + props = jdbcProperties(config, privateKey) + } yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None) + + private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] = + Sync[F].delay { // Wrap in Sync because these can raise exceptions + config.privateKeyPassphrase match { + case Some(passphrase) => + SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase) + case None => + SnowflakeSdkUtils.parsePrivateKey(config.privateKey) + } + } + + private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = { + val props = new Properties() + props.setProperty("user", config.user) + props.put("privateKey", privateKey) + props.setProperty("timezone", "UTC") + config.role.foreach(props.setProperty("role", _)) + props.put("loginTimeout", config.jdbcLoginTimeout.toSeconds.toInt) + props.put("networkTimeout", config.jdbcNetworkTimeout.toMillis.toInt) + props.put("queryTimeout", config.jdbcQueryTimeout.toSeconds.toInt) + props + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index ca795e3..0b8e69b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -19,7 +19,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.nio.charset.StandardCharsets import java.time.OffsetDateTime - import com.snowplowanalytics.iglu.schemaddl.parquet.Caster import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor} @@ -339,7 +338,7 @@ object Processing { ().pure[F] else env.channelProvider.withClosedChannel { - env.tblManager.addColumns(extraColsRequired.toList) + env.tableManager.addColumns(extraColsRequired.toList) } private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala deleted file mode 100644 index 1e21e46..0000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Snowplow Community License Version 1.0, - * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. - * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 - */ -package com.snowplowanalytics.snowplow.snowflake - -import cats.implicits._ -import cats.effect.{Async, Sync} -import doobie.{ConnectionIO, Fragment, Transactor} -import doobie.implicits._ -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger -import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils} - -import java.util.Properties -import java.security.PrivateKey - -object SQLUtils { - - private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - - private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver" - - def transactor[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] = - for { - privateKey <- parsePrivateKey[F](config) - props = jdbcProperties(config, privateKey) - } yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None) - - private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] = - Sync[F].delay { // Wrap in Sync because these can raise exceptions - config.privateKeyPassphrase match { - case Some(passphrase) => - SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase) - case None => - SnowflakeSdkUtils.parsePrivateKey(config.privateKey) - } - } - - private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = { - val props = new Properties() - props.setProperty("user", config.user) - props.put("privateKey", privateKey) - props.setProperty("timezone", "UTC") - config.role.foreach(props.setProperty("role", _)) - props.put("loginTimeout", config.jdbcLoginTimeout.toSeconds.toInt) - props.put("networkTimeout", config.jdbcNetworkTimeout.toMillis.toInt) - props.put("queryTimeout", config.jdbcQueryTimeout.toSeconds.toInt) - props - } - - def createTable[F[_]: Sync](config: Config.Snowflake, xa: Transactor[F]): F[Unit] = { - val t = fqTableName(config) - Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *> - xa.rawTrans.apply { - Logger[ConnectionIO].info(s"Creating table $t if it does not already exist...") *> - sqlCreateTable(t).update.run.void - } - } - - // fully qualified name - def fqTableName(config: Config.Snowflake): String = - s"${config.database}.${config.schema}.${config.table}" - - private def sqlCreateTable(tableName: String): Fragment = sql""" - CREATE TABLE IF NOT EXISTS identifier($tableName) ( - app_id VARCHAR, - platform VARCHAR, - etl_tstamp TIMESTAMP, - collector_tstamp TIMESTAMP NOT NULL, - dvce_created_tstamp TIMESTAMP, - event VARCHAR, - event_id VARCHAR NOT NULL UNIQUE, - txn_id INTEGER, - name_tracker VARCHAR, - v_tracker VARCHAR, - v_collector VARCHAR NOT NULL, - v_etl VARCHAR NOT NULL, - user_id VARCHAR, - user_ipaddress VARCHAR, - user_fingerprint VARCHAR, - domain_userid VARCHAR, - domain_sessionidx SMALLINT, - network_userid VARCHAR, - geo_country VARCHAR, - geo_region VARCHAR, - geo_city VARCHAR, - geo_zipcode VARCHAR, - geo_latitude DOUBLE PRECISION, - geo_longitude DOUBLE PRECISION, - geo_region_name VARCHAR, - ip_isp VARCHAR, - ip_organization VARCHAR, - ip_domain VARCHAR, - ip_netspeed VARCHAR, - page_url VARCHAR, - page_title VARCHAR, - page_referrer VARCHAR, - page_urlscheme VARCHAR, - page_urlhost VARCHAR, - page_urlport INTEGER, - page_urlpath VARCHAR, - page_urlquery VARCHAR, - page_urlfragment VARCHAR, - refr_urlscheme VARCHAR, - refr_urlhost VARCHAR, - refr_urlport INTEGER, - refr_urlpath VARCHAR, - refr_urlquery VARCHAR, - refr_urlfragment VARCHAR, - refr_medium VARCHAR, - refr_source VARCHAR, - refr_term VARCHAR, - mkt_medium VARCHAR, - mkt_source VARCHAR, - mkt_term VARCHAR, - mkt_content VARCHAR, - mkt_campaign VARCHAR, - se_category VARCHAR, - se_action VARCHAR, - se_label VARCHAR, - se_property VARCHAR, - se_value DOUBLE PRECISION, - tr_orderid VARCHAR, - tr_affiliation VARCHAR, - tr_total NUMBER(18,2), - tr_tax NUMBER(18,2), - tr_shipping NUMBER(18,2), - tr_city VARCHAR, - tr_state VARCHAR, - tr_country VARCHAR, - ti_orderid VARCHAR, - ti_sku VARCHAR, - ti_name VARCHAR, - ti_category VARCHAR, - ti_price NUMBER(18,2), - ti_quantity INTEGER, - pp_xoffset_min INTEGER, - pp_xoffset_max INTEGER, - pp_yoffset_min INTEGER, - pp_yoffset_max INTEGER, - useragent VARCHAR, - br_name VARCHAR, - br_family VARCHAR, - br_version VARCHAR, - br_type VARCHAR, - br_renderengine VARCHAR, - br_lang VARCHAR, - br_features_pdf BOOLEAN, - br_features_flash BOOLEAN, - br_features_java BOOLEAN, - br_features_director BOOLEAN, - br_features_quicktime BOOLEAN, - br_features_realplayer BOOLEAN, - br_features_windowsmedia BOOLEAN, - br_features_gears BOOLEAN, - br_features_silverlight BOOLEAN, - br_cookies BOOLEAN, - br_colordepth VARCHAR, - br_viewwidth INTEGER, - br_viewheight INTEGER, - os_name VARCHAR, - os_family VARCHAR, - os_manufacturer VARCHAR, - os_timezone VARCHAR, - dvce_type VARCHAR, - dvce_ismobile BOOLEAN, - dvce_screenwidth INTEGER, - dvce_screenheight INTEGER, - doc_charset VARCHAR, - doc_width INTEGER, - doc_height INTEGER, - tr_currency VARCHAR, - tr_total_base NUMBER(18, 2), - tr_tax_base NUMBER(18, 2), - tr_shipping_base NUMBER(18, 2), - ti_currency VARCHAR, - ti_price_base NUMBER(18, 2), - base_currency VARCHAR, - geo_timezone VARCHAR, - mkt_clickid VARCHAR, - mkt_network VARCHAR, - etl_tags VARCHAR, - dvce_sent_tstamp TIMESTAMP, - refr_domain_userid VARCHAR, - refr_dvce_tstamp TIMESTAMP, - domain_sessionid VARCHAR, - derived_tstamp TIMESTAMP, - event_vendor VARCHAR, - event_name VARCHAR, - event_format VARCHAR, - event_version VARCHAR, - event_fingerprint VARCHAR, - true_tstamp TIMESTAMP, - load_tstamp TIMESTAMP, - CONSTRAINT event_id_pk PRIMARY KEY(event_id) - ) - """ -} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index ca103c1..d2123a9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -7,20 +7,21 @@ */ package com.snowplowanalytics.snowplow.snowflake.processing -import cats.implicits._ import cats.effect.{Async, Sync} -import doobie.{ConnectionIO, Fragment, Transactor} +import cats.implicits._ +import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, JdbcTransactor, Monitoring} import doobie.implicits._ +import doobie.{ConnectionIO, Fragment} +import net.snowflake.client.jdbc.SnowflakeSQLException import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import net.snowflake.client.jdbc.SnowflakeSQLException - -import com.snowplowanalytics.snowplow.snowflake.{Config, SQLUtils} import scala.util.matching.Regex trait TableManager[F[_]] { + def initializeEventsTable(): F[Unit] + def addColumns(columns: List[String]): F[Unit] } @@ -29,32 +30,61 @@ object TableManager { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def fromTransactor[F[_]: Async]( + def make[F[_]: Async]( config: Config.Snowflake, - xa: Transactor[F], snowflakeHealth: SnowflakeHealth[F], - retriesConfig: Config.Retries - ): TableManager[F] = new TableManager[F] { - - def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { - Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> - xa.rawTrans.apply { - columns.traverse_ { col => - sqlAlterTable(config, col).update.run.void - .recoverWith { - case e: SnowflakeSQLException if e.getErrorCode === 1430 => - Logger[ConnectionIO].info(show"Column already exists: $col") - } + retriesConfig: Config.Retries, + monitoring: Monitoring[F] + ): F[TableManager[F]] = + JdbcTransactor.make(config).map { transactor => + new TableManager[F] { + + override def initializeEventsTable(): F[Unit] = + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *> + executeInitTableQuery() + .onError { cause => + monitoring.alert(Alert.FailedToCreateEventsTable(cause)) + } + } + + override def addColumns(columns: List[String]): F[Unit] = + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> + executeAddColumnsQuery(columns) + .onError { cause => + monitoring.alert(Alert.FailedToAddColumns(columns, cause)) + } } + + def executeInitTableQuery(): F[Unit] = { + val tableName = fqTableName(config) + + transactor.rawTrans + .apply { + Logger[ConnectionIO].info(s"Creating table $tableName if it does not already exist...") *> + sqlCreateTable(tableName).update.run.void + } } + + def executeAddColumnsQuery(columns: List[String]): F[Unit] = + transactor.rawTrans.apply { + columns.traverse_ { column => + sqlAlterTable(config, column).update.run.void + .recoverWith { + case e: SnowflakeSQLException if e.getErrorCode === 1430 => + Logger[ConnectionIO].info(show"Column already exists: $column") + } + } + } + } } - } private val reUnstruct: Regex = "^unstruct_event_.*$".r private val reContext: Regex = "^contexts_.*$".r private def sqlAlterTable(config: Config.Snowflake, colName: String): Fragment = { - val tableName = SQLUtils.fqTableName(config) + val tableName = fqTableName(config) val colType = colName match { case reUnstruct() => "OBJECT" case reContext() => "ARRAY" @@ -68,4 +98,143 @@ object TableManager { """ } + // fully qualified name + private def fqTableName(config: Config.Snowflake): String = + s"${config.database}.${config.schema}.${config.table}" + + private def sqlCreateTable(tableName: String): Fragment = + sql""" + CREATE TABLE IF NOT EXISTS identifier($tableName) ( + app_id VARCHAR, + platform VARCHAR, + etl_tstamp TIMESTAMP, + collector_tstamp TIMESTAMP NOT NULL, + dvce_created_tstamp TIMESTAMP, + event VARCHAR, + event_id VARCHAR NOT NULL UNIQUE, + txn_id INTEGER, + name_tracker VARCHAR, + v_tracker VARCHAR, + v_collector VARCHAR NOT NULL, + v_etl VARCHAR NOT NULL, + user_id VARCHAR, + user_ipaddress VARCHAR, + user_fingerprint VARCHAR, + domain_userid VARCHAR, + domain_sessionidx SMALLINT, + network_userid VARCHAR, + geo_country VARCHAR, + geo_region VARCHAR, + geo_city VARCHAR, + geo_zipcode VARCHAR, + geo_latitude DOUBLE PRECISION, + geo_longitude DOUBLE PRECISION, + geo_region_name VARCHAR, + ip_isp VARCHAR, + ip_organization VARCHAR, + ip_domain VARCHAR, + ip_netspeed VARCHAR, + page_url VARCHAR, + page_title VARCHAR, + page_referrer VARCHAR, + page_urlscheme VARCHAR, + page_urlhost VARCHAR, + page_urlport INTEGER, + page_urlpath VARCHAR, + page_urlquery VARCHAR, + page_urlfragment VARCHAR, + refr_urlscheme VARCHAR, + refr_urlhost VARCHAR, + refr_urlport INTEGER, + refr_urlpath VARCHAR, + refr_urlquery VARCHAR, + refr_urlfragment VARCHAR, + refr_medium VARCHAR, + refr_source VARCHAR, + refr_term VARCHAR, + mkt_medium VARCHAR, + mkt_source VARCHAR, + mkt_term VARCHAR, + mkt_content VARCHAR, + mkt_campaign VARCHAR, + se_category VARCHAR, + se_action VARCHAR, + se_label VARCHAR, + se_property VARCHAR, + se_value DOUBLE PRECISION, + tr_orderid VARCHAR, + tr_affiliation VARCHAR, + tr_total NUMBER(18,2), + tr_tax NUMBER(18,2), + tr_shipping NUMBER(18,2), + tr_city VARCHAR, + tr_state VARCHAR, + tr_country VARCHAR, + ti_orderid VARCHAR, + ti_sku VARCHAR, + ti_name VARCHAR, + ti_category VARCHAR, + ti_price NUMBER(18,2), + ti_quantity INTEGER, + pp_xoffset_min INTEGER, + pp_xoffset_max INTEGER, + pp_yoffset_min INTEGER, + pp_yoffset_max INTEGER, + useragent VARCHAR, + br_name VARCHAR, + br_family VARCHAR, + br_version VARCHAR, + br_type VARCHAR, + br_renderengine VARCHAR, + br_lang VARCHAR, + br_features_pdf BOOLEAN, + br_features_flash BOOLEAN, + br_features_java BOOLEAN, + br_features_director BOOLEAN, + br_features_quicktime BOOLEAN, + br_features_realplayer BOOLEAN, + br_features_windowsmedia BOOLEAN, + br_features_gears BOOLEAN, + br_features_silverlight BOOLEAN, + br_cookies BOOLEAN, + br_colordepth VARCHAR, + br_viewwidth INTEGER, + br_viewheight INTEGER, + os_name VARCHAR, + os_family VARCHAR, + os_manufacturer VARCHAR, + os_timezone VARCHAR, + dvce_type VARCHAR, + dvce_ismobile BOOLEAN, + dvce_screenwidth INTEGER, + dvce_screenheight INTEGER, + doc_charset VARCHAR, + doc_width INTEGER, + doc_height INTEGER, + tr_currency VARCHAR, + tr_total_base NUMBER(18, 2), + tr_tax_base NUMBER(18, 2), + tr_shipping_base NUMBER(18, 2), + ti_currency VARCHAR, + ti_price_base NUMBER(18, 2), + base_currency VARCHAR, + geo_timezone VARCHAR, + mkt_clickid VARCHAR, + mkt_network VARCHAR, + etl_tags VARCHAR, + dvce_sent_tstamp TIMESTAMP, + refr_domain_userid VARCHAR, + refr_dvce_tstamp TIMESTAMP, + domain_sessionid VARCHAR, + derived_tstamp TIMESTAMP, + event_vendor VARCHAR, + event_name VARCHAR, + event_format VARCHAR, + event_version VARCHAR, + event_fingerprint VARCHAR, + true_tstamp TIMESTAMP, + load_tstamp TIMESTAMP, + CONSTRAINT event_id_pk PRIMARY KEY(event_id) + ) + """ } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index ba8d841..4a21758 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -25,6 +25,7 @@ object MockEnvironment { sealed trait Action object Action { + case object InitEventsTable extends Action case class Checkpointed(tokens: List[Unique.Token]) extends Action case class SentToBad(count: Int) extends Action case class AlterTableAddedColumns(columns: List[String]) extends Action @@ -57,7 +58,7 @@ object MockEnvironment { source = testSourceAndAck(inputs, state), badSink = testSink(state), httpClient = testHttpClient, - tblManager = testTableManager(state), + tableManager = testTableManager(state), channelProvider = channelProvider, metrics = testMetrics(state), batching = Config.Batching( @@ -78,6 +79,10 @@ object MockEnvironment { } private def testTableManager(state: Ref[IO, Vector[Action]]): TableManager[IO] = new TableManager[IO] { + + override def initializeEventsTable(): IO[Unit] = + state.update(_ :+ InitEventsTable) + def addColumns(columns: List[String]): IO[Unit] = state.update(_ :+ AlterTableAddedColumns(columns)) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 70fdfb2..02f1199 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,6 +38,7 @@ object Dependencies { val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s + val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s val decline = "com.monovore" %% "decline-effect" % V.decline val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor @@ -69,6 +70,7 @@ object Dependencies { runtime, catsRetry, blazeClient, + http4sCirce, decline, sentry, snowflakeIngest,