From 78aea9928ef2a25fe47bb837cd762e25b3ff6f90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Tue, 12 Dec 2023 15:22:24 +0100 Subject: [PATCH] Draft `Monitoring` trait --- .../Environment.scala | 44 +--- .../Monitoring.scala | 59 +++++ .../Run.scala | 3 + .../processing/JdbcTransactor.scala | 49 +++++ .../processing/Processing.scala | 7 +- .../processing/SQLUtils.scala | 202 ----------------- .../processing/TableManager.scala | 208 ++++++++++++++++-- .../MockEnvironment.scala | 16 +- .../processing/ProcessingSpec.scala | 8 + 9 files changed, 334 insertions(+), 262 deletions(-) create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala delete mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index 4cca944..850d19b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -8,14 +8,12 @@ package com.snowplowanalytics.snowplow.snowflake import cats.effect.unsafe.implicits.global -import cats.effect.{Async, Resource, Sync} -import cats.implicits._ +import cats.effect.{Async, Resource} import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} import com.snowplowanalytics.snowplow.sinks.Sink -import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, SnowflakeRetrying, TableManager} +import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, SnowflakeHealth, TableManager} import com.snowplowanalytics.snowplow.sources.SourceAndAck -import io.sentry.Sentry import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client @@ -24,11 +22,12 @@ case class Environment[F[_]]( source: SourceAndAck[F], badSink: Sink[F], httpClient: Client[F], - tblManager: TableManager[F], + tableManager: TableManager[F], channelProvider: ChannelProvider[F], metrics: Metrics[F], batching: Config.Batching, - schemasToSkip: List[SchemaCriterion] + schemasToSkip: List[SchemaCriterion], + monitoring: Monitoring[F] ) object Environment { @@ -46,45 +45,22 @@ object Environment { config.monitoring.healthProbe.port, AppHealth.isHealthy(config.monitoring.healthProbe, sourceAndAck, snowflakeHealth) ) - _ <- enableSentry[F](appInfo, config.monitoring.sentry) + monitoring <- Monitoring.create[F](appInfo, config.monitoring) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) - xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) - _ <- Resource.eval(SnowflakeRetrying.retryIndefinitely(snowflakeHealth, config.retries)(SQLUtils.createTable(config.output.good, xa))) - tblManager = TableManager.fromTransactor(config.output.good, xa, snowflakeHealth, config.retries) + tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring)) channelProvider <- ChannelProvider.make(config.output.good, snowflakeHealth, config.batching, config.retries) - } yield Environment( appInfo = appInfo, source = sourceAndAck, badSink = badSink, httpClient = httpClient, - tblManager = tblManager, + tableManager = tableManager, channelProvider = channelProvider, metrics = metrics, batching = config.batching, - schemasToSkip = config.skipSchemas + schemasToSkip = config.skipSchemas, + monitoring = monitoring ) - - private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] = - config match { - case Some(c) => - val acquire = Sync[F].delay { - Sentry.init { options => - options.setDsn(c.dsn) - options.setRelease(appInfo.version) - c.tags.foreach { case (k, v) => - options.setTag(k, v) - } - } - } - - Resource.makeCase(acquire) { - case (_, Resource.ExitCase.Errored(e)) => Sync[F].delay(Sentry.captureException(e)).void - case _ => Sync[F].unit - } - case None => - Resource.unit[F] - } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala new file mode 100644 index 0000000..f8c68fc --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala @@ -0,0 +1,59 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.Resource +import cats.effect.kernel.Sync +import cats.implicits._ +import com.snowplowanalytics.snowplow.runtime.AppInfo +import io.sentry.{Sentry, SentryOptions} + +trait Monitoring[F[_]] { + def alert(message: Monitoring.Alert): F[Unit] + def reportFatal(exception: Throwable): F[Unit] +} + +object Monitoring { + + sealed trait Alert + + object Alert { + final case class FailedToCreateEventsTable(cause: Throwable) extends Alert + // TODO add more! + + } + + def create[F[_]: Sync](appInfo: AppInfo, config: Config.Monitoring): Resource[F, Monitoring[F]] = + initSentry(appInfo, config.sentry).map { _ => + new Monitoring[F] { + override def alert(message: Alert): F[Unit] = + Sync[F].unit // TODO + + override def reportFatal(exception: Throwable): F[Unit] = + config.sentry.fold(Sync[F].unit) { _ => + Sync[F].delay(Sentry.captureException(exception)).void + } + + } + } + + private def initSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] = + config match { + case Some(sentryConfig) => + Resource.make { + Sync[F].delay(Sentry.init(createSentryOptions(appInfo, sentryConfig))) + } { _ => + Sync[F].delay(Sentry.close()) + } + case None => + Resource.unit[F] + } + + private def createSentryOptions(appInfo: AppInfo, sentryConfig: Config.Sentry): SentryOptions = { + val options = new SentryOptions + options.setDsn(sentryConfig.dsn) + options.setRelease(appInfo.version) + sentryConfig.tags.foreach { case (k, v) => + options.setTag(k, v) + } + options + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala index 683d492..93fcba2 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala @@ -72,6 +72,9 @@ object Run { .compile .drain .as(ExitCode.Success) + .onError { fatalAppError => + env.monitoring.reportFatal(fatalAppError) + } } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala new file mode 100644 index 0000000..79572c0 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.{Async, Sync} +import cats.implicits._ +import doobie.Transactor +import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils} + +import java.security.PrivateKey +import java.util.Properties + +object JdbcTransactor { + + private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver" + + def make[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] = + for { + privateKey <- parsePrivateKey[F](config) + props = jdbcProperties(config, privateKey) + } yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None) + + private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] = + Sync[F].delay { // Wrap in Sync because these can raise exceptions + config.privateKeyPassphrase match { + case Some(passphrase) => + SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase) + case None => + SnowflakeSdkUtils.parsePrivateKey(config.privateKey) + } + } + + private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = { + val props = new Properties() + props.setProperty("user", config.user) + props.put("privateKey", privateKey) + props.setProperty("timezone", "UTC") + config.role.foreach(props.setProperty("role", _)) + props.put("loginTimeout", config.jdbcLoginTimeout.toSeconds.toInt) + props.put("networkTimeout", config.jdbcNetworkTimeout.toMillis.toInt) + props.put("queryTimeout", config.jdbcQueryTimeout.toSeconds.toInt) + props + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index ca795e3..3bba106 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -19,7 +19,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.nio.charset.StandardCharsets import java.time.OffsetDateTime - import com.snowplowanalytics.iglu.schemaddl.parquet.Caster import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor} @@ -37,7 +36,9 @@ object Processing { def stream[F[_]: Async](env: Environment[F]): Stream[F, Nothing] = { val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing) - env.source.stream(eventProcessingConfig, eventProcessor(env)) + + Stream.eval(env.tableManager.initializeEventsTable()) >> + env.source.stream(eventProcessingConfig, eventProcessor(env)) } /** Model used between stages of the processing pipeline */ @@ -339,7 +340,7 @@ object Processing { ().pure[F] else env.channelProvider.withClosedChannel { - env.tblManager.addColumns(extraColsRequired.toList) + env.tableManager.addColumns(extraColsRequired.toList) } private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala deleted file mode 100644 index 1e21e46..0000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SQLUtils.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Snowplow Community License Version 1.0, - * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. - * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 - */ -package com.snowplowanalytics.snowplow.snowflake - -import cats.implicits._ -import cats.effect.{Async, Sync} -import doobie.{ConnectionIO, Fragment, Transactor} -import doobie.implicits._ -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger -import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils} - -import java.util.Properties -import java.security.PrivateKey - -object SQLUtils { - - private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - - private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver" - - def transactor[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] = - for { - privateKey <- parsePrivateKey[F](config) - props = jdbcProperties(config, privateKey) - } yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None) - - private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] = - Sync[F].delay { // Wrap in Sync because these can raise exceptions - config.privateKeyPassphrase match { - case Some(passphrase) => - SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase) - case None => - SnowflakeSdkUtils.parsePrivateKey(config.privateKey) - } - } - - private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = { - val props = new Properties() - props.setProperty("user", config.user) - props.put("privateKey", privateKey) - props.setProperty("timezone", "UTC") - config.role.foreach(props.setProperty("role", _)) - props.put("loginTimeout", config.jdbcLoginTimeout.toSeconds.toInt) - props.put("networkTimeout", config.jdbcNetworkTimeout.toMillis.toInt) - props.put("queryTimeout", config.jdbcQueryTimeout.toSeconds.toInt) - props - } - - def createTable[F[_]: Sync](config: Config.Snowflake, xa: Transactor[F]): F[Unit] = { - val t = fqTableName(config) - Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *> - xa.rawTrans.apply { - Logger[ConnectionIO].info(s"Creating table $t if it does not already exist...") *> - sqlCreateTable(t).update.run.void - } - } - - // fully qualified name - def fqTableName(config: Config.Snowflake): String = - s"${config.database}.${config.schema}.${config.table}" - - private def sqlCreateTable(tableName: String): Fragment = sql""" - CREATE TABLE IF NOT EXISTS identifier($tableName) ( - app_id VARCHAR, - platform VARCHAR, - etl_tstamp TIMESTAMP, - collector_tstamp TIMESTAMP NOT NULL, - dvce_created_tstamp TIMESTAMP, - event VARCHAR, - event_id VARCHAR NOT NULL UNIQUE, - txn_id INTEGER, - name_tracker VARCHAR, - v_tracker VARCHAR, - v_collector VARCHAR NOT NULL, - v_etl VARCHAR NOT NULL, - user_id VARCHAR, - user_ipaddress VARCHAR, - user_fingerprint VARCHAR, - domain_userid VARCHAR, - domain_sessionidx SMALLINT, - network_userid VARCHAR, - geo_country VARCHAR, - geo_region VARCHAR, - geo_city VARCHAR, - geo_zipcode VARCHAR, - geo_latitude DOUBLE PRECISION, - geo_longitude DOUBLE PRECISION, - geo_region_name VARCHAR, - ip_isp VARCHAR, - ip_organization VARCHAR, - ip_domain VARCHAR, - ip_netspeed VARCHAR, - page_url VARCHAR, - page_title VARCHAR, - page_referrer VARCHAR, - page_urlscheme VARCHAR, - page_urlhost VARCHAR, - page_urlport INTEGER, - page_urlpath VARCHAR, - page_urlquery VARCHAR, - page_urlfragment VARCHAR, - refr_urlscheme VARCHAR, - refr_urlhost VARCHAR, - refr_urlport INTEGER, - refr_urlpath VARCHAR, - refr_urlquery VARCHAR, - refr_urlfragment VARCHAR, - refr_medium VARCHAR, - refr_source VARCHAR, - refr_term VARCHAR, - mkt_medium VARCHAR, - mkt_source VARCHAR, - mkt_term VARCHAR, - mkt_content VARCHAR, - mkt_campaign VARCHAR, - se_category VARCHAR, - se_action VARCHAR, - se_label VARCHAR, - se_property VARCHAR, - se_value DOUBLE PRECISION, - tr_orderid VARCHAR, - tr_affiliation VARCHAR, - tr_total NUMBER(18,2), - tr_tax NUMBER(18,2), - tr_shipping NUMBER(18,2), - tr_city VARCHAR, - tr_state VARCHAR, - tr_country VARCHAR, - ti_orderid VARCHAR, - ti_sku VARCHAR, - ti_name VARCHAR, - ti_category VARCHAR, - ti_price NUMBER(18,2), - ti_quantity INTEGER, - pp_xoffset_min INTEGER, - pp_xoffset_max INTEGER, - pp_yoffset_min INTEGER, - pp_yoffset_max INTEGER, - useragent VARCHAR, - br_name VARCHAR, - br_family VARCHAR, - br_version VARCHAR, - br_type VARCHAR, - br_renderengine VARCHAR, - br_lang VARCHAR, - br_features_pdf BOOLEAN, - br_features_flash BOOLEAN, - br_features_java BOOLEAN, - br_features_director BOOLEAN, - br_features_quicktime BOOLEAN, - br_features_realplayer BOOLEAN, - br_features_windowsmedia BOOLEAN, - br_features_gears BOOLEAN, - br_features_silverlight BOOLEAN, - br_cookies BOOLEAN, - br_colordepth VARCHAR, - br_viewwidth INTEGER, - br_viewheight INTEGER, - os_name VARCHAR, - os_family VARCHAR, - os_manufacturer VARCHAR, - os_timezone VARCHAR, - dvce_type VARCHAR, - dvce_ismobile BOOLEAN, - dvce_screenwidth INTEGER, - dvce_screenheight INTEGER, - doc_charset VARCHAR, - doc_width INTEGER, - doc_height INTEGER, - tr_currency VARCHAR, - tr_total_base NUMBER(18, 2), - tr_tax_base NUMBER(18, 2), - tr_shipping_base NUMBER(18, 2), - ti_currency VARCHAR, - ti_price_base NUMBER(18, 2), - base_currency VARCHAR, - geo_timezone VARCHAR, - mkt_clickid VARCHAR, - mkt_network VARCHAR, - etl_tags VARCHAR, - dvce_sent_tstamp TIMESTAMP, - refr_domain_userid VARCHAR, - refr_dvce_tstamp TIMESTAMP, - domain_sessionid VARCHAR, - derived_tstamp TIMESTAMP, - event_vendor VARCHAR, - event_name VARCHAR, - event_format VARCHAR, - event_version VARCHAR, - event_fingerprint VARCHAR, - true_tstamp TIMESTAMP, - load_tstamp TIMESTAMP, - CONSTRAINT event_id_pk PRIMARY KEY(event_id) - ) - """ -} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index ca103c1..50d0324 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -7,20 +7,21 @@ */ package com.snowplowanalytics.snowplow.snowflake.processing -import cats.implicits._ import cats.effect.{Async, Sync} -import doobie.{ConnectionIO, Fragment, Transactor} +import cats.implicits._ +import com.snowplowanalytics.snowplow.snowflake.{Config, JdbcTransactor, Monitoring} import doobie.implicits._ +import doobie.{ConnectionIO, Fragment} +import net.snowflake.client.jdbc.SnowflakeSQLException import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import net.snowflake.client.jdbc.SnowflakeSQLException - -import com.snowplowanalytics.snowplow.snowflake.{Config, SQLUtils} import scala.util.matching.Regex trait TableManager[F[_]] { + def initializeEventsTable(): F[Unit] + def addColumns(columns: List[String]): F[Unit] } @@ -29,32 +30,58 @@ object TableManager { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def fromTransactor[F[_]: Async]( + def make[F[_]: Async]( config: Config.Snowflake, - xa: Transactor[F], snowflakeHealth: SnowflakeHealth[F], - retriesConfig: Config.Retries - ): TableManager[F] = new TableManager[F] { - - def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { - Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> - xa.rawTrans.apply { - columns.traverse_ { col => - sqlAlterTable(config, col).update.run.void - .recoverWith { - case e: SnowflakeSQLException if e.getErrorCode === 1430 => - Logger[ConnectionIO].info(show"Column already exists: $col") - } + retriesConfig: Config.Retries, + monitoring: Monitoring[F] + ): F[TableManager[F]] = + JdbcTransactor.make(config).map { transactor => + new TableManager[F] { + + override def initializeEventsTable(): F[Unit] = + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *> + executeInitTableQuery() + .onError { cause => + monitoring.alert(Monitoring.Alert.FailedToCreateEventsTable(cause)) + } + } + + override def addColumns(columns: List[String]): F[Unit] = + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> + executeAddColumnsQuery(columns) } + + def executeInitTableQuery(): F[Unit] = { + val tableName = fqTableName(config) + + transactor.rawTrans + .apply { + Logger[ConnectionIO].info(s"Creating table $tableName if it does not already exist...") *> + sqlCreateTable(tableName).update.run.void + } } + + def executeAddColumnsQuery(columns: List[String]): F[Unit] = + transactor.rawTrans.apply { + columns.traverse_ { column => + sqlAlterTable(config, column).update.run.void + .recoverWith { + case e: SnowflakeSQLException if e.getErrorCode === 1430 => + Logger[ConnectionIO].info(show"Column already exists: $column") + } + } + } + } } - } private val reUnstruct: Regex = "^unstruct_event_.*$".r private val reContext: Regex = "^contexts_.*$".r private def sqlAlterTable(config: Config.Snowflake, colName: String): Fragment = { - val tableName = SQLUtils.fqTableName(config) + val tableName = fqTableName(config) val colType = colName match { case reUnstruct() => "OBJECT" case reContext() => "ARRAY" @@ -68,4 +95,143 @@ object TableManager { """ } + // fully qualified name + private def fqTableName(config: Config.Snowflake): String = + s"${config.database}.${config.schema}.${config.table}" + + private def sqlCreateTable(tableName: String): Fragment = + sql""" + CREATE TABLE IF NOT EXISTS identifier($tableName) ( + app_id VARCHAR, + platform VARCHAR, + etl_tstamp TIMESTAMP, + collector_tstamp TIMESTAMP NOT NULL, + dvce_created_tstamp TIMESTAMP, + event VARCHAR, + event_id VARCHAR NOT NULL UNIQUE, + txn_id INTEGER, + name_tracker VARCHAR, + v_tracker VARCHAR, + v_collector VARCHAR NOT NULL, + v_etl VARCHAR NOT NULL, + user_id VARCHAR, + user_ipaddress VARCHAR, + user_fingerprint VARCHAR, + domain_userid VARCHAR, + domain_sessionidx SMALLINT, + network_userid VARCHAR, + geo_country VARCHAR, + geo_region VARCHAR, + geo_city VARCHAR, + geo_zipcode VARCHAR, + geo_latitude DOUBLE PRECISION, + geo_longitude DOUBLE PRECISION, + geo_region_name VARCHAR, + ip_isp VARCHAR, + ip_organization VARCHAR, + ip_domain VARCHAR, + ip_netspeed VARCHAR, + page_url VARCHAR, + page_title VARCHAR, + page_referrer VARCHAR, + page_urlscheme VARCHAR, + page_urlhost VARCHAR, + page_urlport INTEGER, + page_urlpath VARCHAR, + page_urlquery VARCHAR, + page_urlfragment VARCHAR, + refr_urlscheme VARCHAR, + refr_urlhost VARCHAR, + refr_urlport INTEGER, + refr_urlpath VARCHAR, + refr_urlquery VARCHAR, + refr_urlfragment VARCHAR, + refr_medium VARCHAR, + refr_source VARCHAR, + refr_term VARCHAR, + mkt_medium VARCHAR, + mkt_source VARCHAR, + mkt_term VARCHAR, + mkt_content VARCHAR, + mkt_campaign VARCHAR, + se_category VARCHAR, + se_action VARCHAR, + se_label VARCHAR, + se_property VARCHAR, + se_value DOUBLE PRECISION, + tr_orderid VARCHAR, + tr_affiliation VARCHAR, + tr_total NUMBER(18,2), + tr_tax NUMBER(18,2), + tr_shipping NUMBER(18,2), + tr_city VARCHAR, + tr_state VARCHAR, + tr_country VARCHAR, + ti_orderid VARCHAR, + ti_sku VARCHAR, + ti_name VARCHAR, + ti_category VARCHAR, + ti_price NUMBER(18,2), + ti_quantity INTEGER, + pp_xoffset_min INTEGER, + pp_xoffset_max INTEGER, + pp_yoffset_min INTEGER, + pp_yoffset_max INTEGER, + useragent VARCHAR, + br_name VARCHAR, + br_family VARCHAR, + br_version VARCHAR, + br_type VARCHAR, + br_renderengine VARCHAR, + br_lang VARCHAR, + br_features_pdf BOOLEAN, + br_features_flash BOOLEAN, + br_features_java BOOLEAN, + br_features_director BOOLEAN, + br_features_quicktime BOOLEAN, + br_features_realplayer BOOLEAN, + br_features_windowsmedia BOOLEAN, + br_features_gears BOOLEAN, + br_features_silverlight BOOLEAN, + br_cookies BOOLEAN, + br_colordepth VARCHAR, + br_viewwidth INTEGER, + br_viewheight INTEGER, + os_name VARCHAR, + os_family VARCHAR, + os_manufacturer VARCHAR, + os_timezone VARCHAR, + dvce_type VARCHAR, + dvce_ismobile BOOLEAN, + dvce_screenwidth INTEGER, + dvce_screenheight INTEGER, + doc_charset VARCHAR, + doc_width INTEGER, + doc_height INTEGER, + tr_currency VARCHAR, + tr_total_base NUMBER(18, 2), + tr_tax_base NUMBER(18, 2), + tr_shipping_base NUMBER(18, 2), + ti_currency VARCHAR, + ti_price_base NUMBER(18, 2), + base_currency VARCHAR, + geo_timezone VARCHAR, + mkt_clickid VARCHAR, + mkt_network VARCHAR, + etl_tags VARCHAR, + dvce_sent_tstamp TIMESTAMP, + refr_domain_userid VARCHAR, + refr_dvce_tstamp TIMESTAMP, + domain_sessionid VARCHAR, + derived_tstamp TIMESTAMP, + event_vendor VARCHAR, + event_name VARCHAR, + event_format VARCHAR, + event_version VARCHAR, + event_fingerprint VARCHAR, + true_tstamp TIMESTAMP, + load_tstamp TIMESTAMP, + CONSTRAINT event_id_pk PRIMARY KEY(event_id) + ) + """ } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index ba8d841..7066ecd 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -25,6 +25,7 @@ object MockEnvironment { sealed trait Action object Action { + case object InitEventsTable extends Action case class Checkpointed(tokens: List[Unique.Token]) extends Action case class SentToBad(count: Int) extends Action case class AlterTableAddedColumns(columns: List[String]) extends Action @@ -57,7 +58,7 @@ object MockEnvironment { source = testSourceAndAck(inputs, state), badSink = testSink(state), httpClient = testHttpClient, - tblManager = testTableManager(state), + tableManager = testTableManager(state), channelProvider = channelProvider, metrics = testMetrics(state), batching = Config.Batching( @@ -65,7 +66,8 @@ object MockEnvironment { maxDelay = 10.seconds, uploadConcurrency = 1 ), - schemasToSkip = List.empty + schemasToSkip = List.empty, + monitoring = testMonitoring() ) MockEnvironment(state, env) } @@ -78,6 +80,10 @@ object MockEnvironment { } private def testTableManager(state: Ref[IO, Vector[Action]]): TableManager[IO] = new TableManager[IO] { + + override def initializeEventsTable(): IO[Unit] = + state.update(_ :+ InitEventsTable) + def addColumns(columns: List[String]): IO[Unit] = state.update(_ :+ AlterTableAddedColumns(columns)) } @@ -159,4 +165,10 @@ object MockEnvironment { def report: Stream[IO, Nothing] = Stream.never[IO] } + + private def testMonitoring(): Monitoring[IO] = new Monitoring[IO] { + override def alert(message: Monitoring.Alert): IO[Unit] = IO.unit + + override def reportFatal(exception: Throwable): IO[Unit] = IO.unit + } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index 7a33847..a7dbce8 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -47,6 +47,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.WroteRowsToSnowflake(4), Action.AddedGoodCountMetric(4), Action.AddedBadCountMetric(0), @@ -62,6 +63,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.SentToBad(6), Action.AddedGoodCountMetric(0), Action.AddedBadCountMetric(6), @@ -81,6 +83,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.WroteRowsToSnowflake(6), Action.SentToBad(6), Action.AddedGoodCountMetric(6), @@ -106,6 +109,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.WroteRowsToSnowflake(1), Action.ClosedChannel, Action.AlterTableAddedColumns(List("unstruct_event_xyz_1", "contexts_abc_2")), @@ -135,6 +139,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.WroteRowsToSnowflake(1), Action.SentToBad(1), Action.AddedGoodCountMetric(1), @@ -161,6 +166,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.WroteRowsToSnowflake(1) ) ) @@ -179,6 +185,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.ClosedChannel, Action.OpenedChannel, Action.WroteRowsToSnowflake(2), @@ -205,6 +212,7 @@ class ProcessingSpec extends Specification with CatsEffect { state <- control.state.get } yield state should beEqualTo( Vector( + Action.InitEventsTable, Action.SetLatencyMetric(42123), Action.SetLatencyMetric(42123), Action.WroteRowsToSnowflake(4),