From 54ee7538aa46fdf61219f853031b12ce6b66c439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Fri, 2 Feb 2024 11:38:55 +0100 Subject: [PATCH] Upgrade collector to 3.2.0 and enrich to 4.2.0 New collector and enrich versions mean: * Cats Effect 3 * Http4s as web server, so no more Akka HTTP * Using IO monad instead of Id To be consistent with other Snowplow applications, this commit also brings small improvements to configuration loading. Just like collector configuration, iglu resolver and all enrichments support HOCON configuration format. As collector 3.2.x is on scala 2.13 and enrich 4.X on scala 2.12, it was easier to cross compile collector to 2.12 than upgrade enrich to 2.13. That's why micro is still on scala 2.12. --- .github/workflows/deploy.yml | 4 +- .github/workflows/test.yml | 4 +- README.md | 4 +- build.sbt | 21 +- project/Dependencies.scala | 23 +- project/plugins.sbt | 13 +- src/main/resources/application.conf | 249 ---------------- src/main/resources/collector-micro.conf | 18 ++ ...solver.json => default-iglu-resolver.conf} | 0 .../decode/ValueConverter.scala | 2 +- .../CirceSupport.scala | 97 ------- .../ConfigHelper.scala | 234 --------------- .../Configuration.scala | 266 ++++++++++++++++++ .../IdImplicits.scala | 29 -- .../IgluService.scala | 41 --- .../Main.scala | 103 +------ .../MemorySink.scala | 153 +++++----- .../MicroHttpServer.scala | 97 +++++++ .../Routing.scala | 206 +++++++------- .../Run.scala | 157 +++++++++++ .../ConfigHelperSpec.scala | 29 +- .../MemorySinkSpec.scala | 148 ++++++---- .../MicroApiSpec.scala | 85 ++++++ .../TestAdapterRegistry.scala | 154 ++++++++++ 24 files changed, 1116 insertions(+), 1021 deletions(-) delete mode 100644 src/main/resources/application.conf create mode 100644 src/main/resources/collector-micro.conf rename src/main/resources/{default-iglu-resolver.json => default-iglu-resolver.conf} (100%) delete mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala delete mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala create mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala delete mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala delete mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala create mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala create mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala create mode 100644 src/test/scala/com.snowplowanalytics.snowplow.micro/MicroApiSpec.scala create mode 100644 src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 3ee8f12..15246c4 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -28,9 +28,9 @@ jobs: - name: Publish collector locally run: | - git clone --branch 2.8.1 --depth 1 https://github.com/snowplow/stream-collector.git + git clone --branch 3.2.0 --depth 1 https://github.com/snowplow/stream-collector.git cd stream-collector - sbt publishLocal + sbt +publishLocal - name: Run sbt run: sbt test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 93fc68b..39aff62 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,9 +18,9 @@ jobs: - name: Publish collector locally run: | - git clone --branch 2.8.1 --depth 1 https://github.com/snowplow/stream-collector.git + git clone --branch 3.2.0 --depth 1 https://github.com/snowplow/stream-collector.git cd stream-collector - sbt publishLocal + sbt +publishLocal - name: Run sbt run: sbt test diff --git a/README.md b/README.md index 5e30eea..b4b56fa 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,9 @@ Assuming [Git][git] and [sbt][sbt]: git clone git@github.com:snowplow-incubator/snowplow-micro.git cd snowplow-micro -git clone --branch 2.8.1 --depth 1 git@github.com:snowplow/stream-collector.git +git clone --branch 3.2.0 --depth 1 git@github.com:snowplow/stream-collector.git cd stream-collector -sbt publishLocal && cd .. +sbt +publishLocal && cd .. sbt test ``` diff --git a/build.sbt b/build.sbt index e2b80da..9705da7 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,9 @@ * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. */ import com.typesafe.sbt.packager.MappingsHelper.directory -import com.typesafe.sbt.packager.docker._ +import com.typesafe.sbt.packager.docker.* + +import scala.collection.Seq lazy val buildSettings = Seq( name := "snowplow-micro", @@ -20,29 +22,31 @@ lazy val buildSettings = Seq( baseDirectory.value / "ui" / "out" ), Compile / unmanagedResources += file("LICENSE.md"), - resolvers ++= Dependencies.resolvers + resolvers ++= Dependencies.resolvers, + Test / parallelExecution := false ) lazy val dependencies = Seq( libraryDependencies ++= Seq( Dependencies.snowplowStreamCollector, Dependencies.snowplowCommonEnrich, + Dependencies.decline, + Dependencies.http4sCirce, Dependencies.circeJawn, Dependencies.circeGeneric, Dependencies.specs2, + Dependencies.specs2CE, Dependencies.badRows ) ) -lazy val exclusions = Seq( - excludeDependencies ++= Dependencies.exclusions -) - lazy val buildInfoSettings = Seq( - buildInfoKeys := Seq[BuildInfoKey](organization, name, version, scalaVersion), - buildInfoPackage := "buildinfo" + buildInfoKeys := Seq[BuildInfoKey](name, moduleName, dockerAlias, version, "shortName" -> "micro-ssc"), + buildInfoPackage := "com.snowplowanalytics.snowplow.micro", + buildInfoOptions += BuildInfoOption.Traits("com.snowplowanalytics.snowplow.collector.core.AppInfo") ) + lazy val dynVerSettings = Seq( ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix ThisBuild / dynverSeparator := "-" // to be compatible with docker @@ -50,7 +54,6 @@ lazy val dynVerSettings = Seq( lazy val commonSettings = dependencies ++ - exclusions ++ buildSettings ++ buildInfoSettings ++ dynVerSettings ++ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 84433ab..bcfcb6d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,33 +16,36 @@ object Dependencies { object V { // Snowplow - val snowplowStreamCollector = "2.8.1" - val snowplowCommonEnrich = "3.8.0" + val snowplowStreamCollector = "3.2.0" + val snowplowCommonEnrich = "4.2.0" + val http4sCirce = "0.23.23" + val decline = "2.4.1" + // circe val circe = "0.14.2" // specs2 val specs2 = "4.12.2" + val specs2CE = "1.5.0" // force versions of transitive dependencies val badRows = "2.2.0" } - val exclusions = Seq( - "org.apache.tomcat.embed" % "tomcat-embed-core" - ) - - // Snowplow stream collector - val snowplowStreamCollector = "com.snowplowanalytics" %% "snowplow-stream-collector-core" % V.snowplowStreamCollector - val snowplowCommonEnrich = "com.snowplowanalytics" %% "snowplow-common-enrich" % V.snowplowCommonEnrich + val snowplowStreamCollector = "com.snowplowanalytics" %% "snowplow-stream-collector-http4s-core" % V.snowplowStreamCollector + val snowplowCommonEnrich = "com.snowplowanalytics" %% "snowplow-common-enrich" % V.snowplowCommonEnrich + + val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4sCirce + val decline = "com.monovore" %% "decline-effect" % V.decline // circe val circeJawn = "io.circe" %% "circe-jawn" % V.circe val circeGeneric = "io.circe" %% "circe-generic" % V.circe // specs2 - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test // transitive val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows diff --git a/project/plugins.sbt b/project/plugins.sbt index 3e59965..f375a39 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,6 @@ -logLevel := Level.Warn - -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0") -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.8.2") -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.3") -addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.10.0") -addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2") +addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1") +addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") +addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0") +addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf deleted file mode 100644 index 3d062bd..0000000 --- a/src/main/resources/application.conf +++ /dev/null @@ -1,249 +0,0 @@ -# 'collector' contains configuration options for the main Scala collector. -collector { - # The collector runs as a web service specified on the following interface and port. - interface = "0.0.0.0" - port = "9090" - - # optional SSL/TLS configuration - ssl { - enable = false - # whether to redirect HTTP to HTTPS - redirect = false - port = 9543 - } - - # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol. - # The expected values are: - # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2 - # - r/tp2 for redirects - # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook - # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks - # downstream of the collector. - # But you can also map any valid (i.e. two-segment) path to one of the three defaults. - # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full - # valid paths starting with a leading slash. - # Pass in an empty map to avoid mapping. - paths { - # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" - # "/com.acme/redirect" = "/r/tp2" - # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" - } - - # Configure the P3P policy header. - p3p { - policyRef = "/w3c/p3p.xml" - CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" - } - - # Cross domain policy configuration. - # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml - # route. - crossDomain { - enabled = false - # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com - domains = [ "*" ] - # Whether to only grant access to HTTPS or both HTTPS and HTTP sources - secure = true - } - - # The collector returns a cookie to clients for user identification - # with the following domain and expiration. - cookie { - enabled = true - expiration = "365 days" - # Network cookie name - name = "micro" - # The domain is optional and will make the cookie accessible to other - # applications on the domain. Comment out these lines to tie cookies to - # the collector's full domain. - # The domain is determined by matching the domains from the Origin header of the request - # to the list below. The first match is used. If no matches are found, the fallback domain will be used, - # if configured. - # If you specify a main domain, all subdomains on it will be matched. - # If you specify a subdomain, only that subdomain will be matched. - # Examples: - # domain.com will match domain.com, www.domain.com and secure.client.domain.com - # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com - domains = [ - # "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned - # "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned - # ... more domains - ] - # ... more domains - # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of - # cookie domains configured above. (For example, if there is no Origin header.) - # fallback-domain = "{{fallbackDomain}}" - secure = false - httpOnly = false - # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`, - # `Lax` or `None` to limit the cookie sent context. - # Strict: the cookie will only be sent along with "same-site" requests. - # Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation. - # None: the cookie will be sent with same-site and cross-site requests. - # sameSite = "{{cookieSameSite}}" - } - - # If you have a do not track cookie in place, the Scala Stream Collector can respect it by - # completely bypassing the processing of an incoming request carrying this cookie, the collector - # will simply reply by a 200 saying "do not track". - # The cookie name and value must match the configuration below, where the names of the cookies must - # match entirely and the value could be a regular expression. - doNotTrackCookie { - enabled = false - name = "foo" - value = "bar" - } - - # When enabled and the cookie specified above is missing, performs a redirect to itself to check - # if third-party cookies are blocked using the specified name. If they are indeed blocked, - # fallbackNetworkId is used instead of generating a new random one. - cookieBounce { - enabled = false - # The name of the request parameter which will be used on redirects checking that third-party - # cookies work. - name = "n3pc" - # Network user id to fallback to when third-party cookies are blocked. - fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000" - # Optionally, specify the name of the header containing the originating protocol for use in the - # bounce redirect location. Use this if behind a load balancer that performs SSL termination. - # The value of this header must be http or https. Example, if behind an AWS Classic ELB. - # forwardedProtocolHeader = "X-Forwarded-Proto" - } - - # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved. - # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found` - # Custom redirects configured in `paths` can still be used. - # enableDefaultRedirect = true - - # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder - # token. All instances of that token are replaced withe the network ID. If the placeholder isn't - # specified, the default value is `${SP_NUID}`. - redirectMacro { - enabled = false - # Optional custom placeholder token (defaults to the literal `${SP_NUID}`) - placeholder = "[TOKEN]" - } - - # Customize response handling for requests for the root path ("/"). - # Useful if you need to redirect to web content or privacy policies regarding the use of this collector. - rootResponse { - enabled = false - statusCode = 302 - # Optional, defaults to empty map - headers = { - Location = "https://127.0.0.1/", - X-Custom = "something" - } - # Optional, defaults to empty string - body = "302, redirecting" - } - - # Configuration related to CORS preflight requests - cors { - # The Access-Control-Max-Age response header indicates how long the results of a preflight - # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h. - accessControlMaxAge = 5 seconds - } - - - monitoring.metrics.statsd { - enabled = false - # StatsD metric reporting protocol configuration - hostname = localhost - port = 8125 - # Required, how frequently to report metrics - period = "10 seconds" - # Optional, override the default metric prefix - # "prefix": "snowplow.collector" - } - - streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "" - - # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic - bad = "" - - # Whether to use the incoming event's ip as the partition key for the good stream/topic - # Note: Nsq does not make use of partition key. - useIpAddressAsPartitionKey = false - - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, googlepubsub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = stdout - - } - - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 100000 - recordLimit = 40 - timeLimit = 1000 - } - } - - enableDefaultRedirect = false - redirectDomains = [] - enableStartupChecks = true - terminationDeadline = 10.seconds - preTerminationPeriod = 0.seconds - preTerminationUnhealthy = false - experimental { - warmup { - enable = false - numRequests = 2000 - maxConnections = 2000 - maxCycles = 1 - } - } -} - -# Akka has a variety of possible configuration options defined at -# http://doc.akka.io/docs/akka/current/scala/general/configuration.html -akka { - loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging. - loggers = ["akka.event.slf4j.Slf4jLogger"] - - # akka-http is the server the Stream collector uses and has configurable options defined at - # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html - http.server { - # To obtain the hostname in the collector, the 'remote-address' header - # should be set. By default, this is disabled, and enabling it - # adds the 'Remote-Address' header to every request automatically. - remote-address-header = on - - raw-request-uri-header = on - - # Define the maximum request length (the default is 2048) - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - } - } - - # By default setting `collector.ssl` relies on JSSE (Java Secure Socket - # Extension) to enable secure communication. - # To override the default settings set the following section as per - # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html - # ssl-config { - # debug = { - # ssl = true - # } - # keyManager = { - # stores = [ - # {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" } - # ] - # } - # loose { - # disableHostnameVerification = false - # } - # } -} diff --git a/src/main/resources/collector-micro.conf b/src/main/resources/collector-micro.conf new file mode 100644 index 0000000..e7c6364 --- /dev/null +++ b/src/main/resources/collector-micro.conf @@ -0,0 +1,18 @@ +collector { + interface = "0.0.0.0" + port = 9090 + ssl { + port = 9543 + } + + streams { + good = "good" + bad = "bad" + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + sink {} + } +} \ No newline at end of file diff --git a/src/main/resources/default-iglu-resolver.json b/src/main/resources/default-iglu-resolver.conf similarity index 100% rename from src/main/resources/default-iglu-resolver.json rename to src/main/resources/default-iglu-resolver.conf diff --git a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala index d62bd6b..325aaa6 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala @@ -39,7 +39,7 @@ object ValueConverter { ofFunc(_ => f(_).asRight) implicit def valueDecoderCase[A](implicit decoder: ValueDecoder[A]): Aux[Option[String], A] = - ofFunc(key => x => decoder.parse(key, x.getOrElse(""))) + ofFunc(key => x => decoder.parse(key, x.getOrElse(""), None)) implicit def floatDoubleCase: Aux[Option[Float], Option[Double]] = simple(_.map(_.toDouble)) diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala deleted file mode 100644 index a0e313a..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.micro - -import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} -import akka.http.scaladsl.model.{ContentType, ContentTypeRange, HttpEntity} -import akka.http.scaladsl.model.MediaType -import akka.http.scaladsl.model.MediaTypes.`application/json` -import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} -import akka.util.ByteString - -import io.circe.{Decoder, Encoder, Json, Printer, jawn} - -import scala.collection.immutable.Seq - -import org.joda.time.DateTime - -import org.apache.http.NameValuePair - -/** Add support for unmarshalling HTTP JSON requests - * and marshalling HTTP JSON responses, using circe library. - * More information about marshalling can be found here - * https://doc.akka.io/docs/akka-http/current/common/marshalling.html. - * - * This code mostly comes from https://github.com/hseeberger/akka-http-json. - */ -private[micro] object CirceSupport { - - // To encode the datetime in a CollectorPayload - implicit val dateTimeEncoder: Encoder[DateTime] = - Encoder[String].contramap(_.toString) - - // To encode the querystring in a CollectorPayload - implicit val nameValuePairEncoder: Encoder[NameValuePair] = - Encoder[String].contramap(kv => s"${kv.getName()}=${kv.getValue()}") - - def unmarshallerContentTypes: Seq[ContentTypeRange] = - mediaTypes.map(ContentTypeRange.apply) - - def mediaTypes: Seq[MediaType.WithFixedCharset] = - List(`application/json`) - - /** `Json` => HTTP entity - * @return marshaller for JSON value - */ - implicit final def jsonMarshaller( - implicit printer: Printer = Printer.noSpaces - ): ToEntityMarshaller[Json] = - Marshaller.oneOf(mediaTypes: _*) { mediaType => - Marshaller.withFixedContentType(ContentType(mediaType)) { json => - HttpEntity( - mediaType, - ByteString( - printer.printToByteBuffer(json, mediaType.charset.nioCharset()))) - } - } - - /** `A` => HTTP entity - * @tparam A type to encode - * @return marshaller for any `A` value - */ - implicit final def marshaller[A: Encoder]( - implicit printer: Printer = Printer.noSpaces - ): ToEntityMarshaller[A] = - jsonMarshaller(printer).compose(Encoder[A].apply) - - /** HTTP entity => `Json` - * @return unmarshaller for `Json` - */ - implicit final val jsonUnmarshaller: FromEntityUnmarshaller[Json] = - Unmarshaller.byteStringUnmarshaller - .forContentTypes(unmarshallerContentTypes: _*) - .map { - case ByteString.empty => throw Unmarshaller.NoContentException - case data => - jawn.parseByteBuffer(data.asByteBuffer).fold(throw _, identity) - } - - /** HTTP entity => `A` - * @tparam A type to decode - * @return unmarshaller for `A` - */ - implicit def unmarshaller[A: Decoder]: FromEntityUnmarshaller[A] = { - def decode(json: Json) = Decoder[A].decodeJson(json).fold(throw _, identity) - jsonUnmarshaller.map(decode) - } -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala deleted file mode 100644 index 2ef2ba4..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.micro - -import cats.Id -import cats.implicits._ -import com.snowplowanalytics.iglu.client.IgluCirceClient -import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.client.resolver.registries.Registry -import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} -import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorConfig, SinkConfig} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf -import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils -import com.snowplowanalytics.snowplow.micro.IdImplicits._ -import com.typesafe.config.{Config, ConfigFactory} -import io.circe.Json -import io.circe.parser.parse -import io.circe.syntax._ -import pureconfig.generic.auto._ -import pureconfig.generic.{FieldCoproductHint, ProductHint} -import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource} - -import java.io.File -import java.net.URI -import java.nio.file.{Path, Paths} -import java.security.{KeyStore, SecureRandom} -import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} -import scala.io.Source - -/** Contain functions to parse the command line arguments, - * to parse the configuration for the collector, Akka HTTP and Iglu - * and to instantiate Iglu client. - */ -private[micro] object ConfigHelper { - object EnvironmentVariables { - val igluRegistryUrl = "MICRO_IGLU_REGISTRY_URL" - val igluApiKey = "MICRO_IGLU_API_KEY" - val sslCertificatePassword = "MICRO_SSL_CERT_PASSWORD" - } - - implicit def hint[T] = - ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - - // Copied from Enrich - necessary for parsing enrichment configs - implicit val sinkConfigHint = new FieldCoproductHint[SinkConfig]("enabled") - type EitherS[A] = Either[String, A] - - case class MicroConfig( - collectorConfig: CollectorConfig, - igluResolver: Resolver[Id], - igluClient: IgluCirceClient[Id], - enrichmentConfigs: List[EnrichmentConf], - akkaConfig: Config, - sslContext: Option[SSLContext], - outputEnrichedTsv: Boolean - ) - - /** Parse the command line arguments and the configuration files. */ - def parseConfig(args: Array[String]): MicroConfig = { - case class CommandLineOptions( - collectorConfigFile: Option[File] = None, - igluConfigFile: Option[File] = None, - outputEnrichedTsv: Boolean = false - ) - - def formatEnvironmentVariables(descriptions: (String, String)*): String = { - val longest = descriptions.map(_._1.length).max - descriptions.map { - case (envVar, desc) => s" $envVar${" " * (longest - envVar.length)} $desc" - }.mkString("\n") - } - - val parser = new scopt.OptionParser[CommandLineOptions](buildinfo.BuildInfo.name) { - head(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) - help("help") - version("version") - opt[Option[File]]("collector-config") - .optional() - .valueName("") - .text("Configuration file for collector") - .action((f: Option[File], c: CommandLineOptions) => c.copy(collectorConfigFile = f)) - .validate(f => - f match { - case Some(file) => - if (file.exists) success - else failure(s"Configuration file $f does not exist") - case None => success - } - ) - opt[Option[File]]("iglu") - .optional() - .valueName("") - .text("Configuration file for Iglu Client") - .action((f: Option[File], c: CommandLineOptions) => c.copy(igluConfigFile = f)) - .validate(f => - f match { - case Some(file) => - if (file.exists) success - else failure(s"Configuration file $f does not exist") - case None => success - } - ) - opt[Unit]('t', "output-tsv") - .optional() - .text("Print events in TSV format to standard output") - .action((_, c: CommandLineOptions) => c.copy(outputEnrichedTsv = true)) - note( - "\nSupported environment variables:\n\n" + formatEnvironmentVariables( - EnvironmentVariables.igluRegistryUrl -> - "The URL for an additional custom Iglu registry", - EnvironmentVariables.igluApiKey -> - s"An optional API key for an Iglu registry defined with ${EnvironmentVariables.igluRegistryUrl}", - EnvironmentVariables.sslCertificatePassword -> - "The password for the optional SSL/TLS certificate in /config/ssl-certificate.p12. Enables HTTPS" - ) - ) - } - - val config = parser.parse(args, CommandLineOptions()) getOrElse { - throw new RuntimeException("Problem while parsing arguments") // should never be called - } - - val resolved = config.collectorConfigFile match { - case Some(f) => ConfigFactory.parseFile(f).resolve() - case None => ConfigFactory.empty() - } - - val collectorConfig = ConfigFactory.load(resolved.withFallback(ConfigFactory.load())) - - val resolverSource = config.igluConfigFile match { - case Some(f) => Source.fromFile(f) - case None => Source.fromResource("default-iglu-resolver.json") - } - - val extraRegistry = sys.env.get(EnvironmentVariables.igluRegistryUrl).map { registry => - val uri = URI.create(registry) - Registry.Http( - Registry.Config(s"Custom ($registry)", 0, List.empty), - Registry.HttpConnection(uri, sys.env.get(EnvironmentVariables.igluApiKey)) - ) - } - - val (resolver, igluClient) = getIgluClientFromSource(resolverSource, extraRegistry) match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while reading Iglu config file: $e.") - } - - val enrichmentConfigs = Option(getClass.getResource("/enrichments")).map { dir => - getEnrichmentRegistryFromPath(Paths.get(dir.toURI), igluClient) match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while reading enrichment config file(s): $e.") - } - }.getOrElse(List.empty) - - val sslContext = sys.env.get(EnvironmentVariables.sslCertificatePassword).map { password => - // Adapted from https://doc.akka.io/docs/akka-http/current/server-side/server-https-support.html. - // We could use SSLContext.getDefault instead of all of this, but then we would need to - // force the user to add arcane -D flags when running Micro, which is not the best experience. - val keystore = KeyStore.getInstance("PKCS12") - val certificateFile = getClass.getClassLoader.getResourceAsStream("ssl-certificate.p12") - keystore.load(certificateFile, password.toCharArray) - - val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") - keyManagerFactory.init(keystore, password.toCharArray) - - val trustManagerFactory = TrustManagerFactory.getInstance("SunX509") - trustManagerFactory.init(keystore) - - val context = SSLContext.getInstance("TLS") - context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) - - context - } - - MicroConfig( - ConfigSource.fromConfig(collectorConfig.getConfig("collector")).loadOrThrow[CollectorConfig], - resolver, - igluClient, - enrichmentConfigs, - collectorConfig, - sslContext, - config.outputEnrichedTsv - ) - } - - /** Instantiate an Iglu client from its configuration file. */ - def getIgluClientFromSource(igluConfigSource: Source, extraRegistry: Option[Registry]): Either[String, (Resolver[Id], IgluCirceClient[Id])] = - for { - text <- Either.catchNonFatal(igluConfigSource.mkString).leftMap(_.getMessage) - json <- parse(text).leftMap(_.show) - config <- Resolver.parseConfig(json).leftMap(_.show) - resolver <- Resolver.fromConfig[Id](config).leftMap(_.show).value - completeResolver = resolver.copy(repos = resolver.repos ++ extraRegistry) - } yield (completeResolver, IgluCirceClient.fromResolver[Id](completeResolver, config.cacheSize)) - - def getEnrichmentRegistryFromPath(path: Path, igluClient: IgluCirceClient[Id]) = { - val schemaKey = SchemaKey( - "com.snowplowanalytics.snowplow", - "enrichments", - "jsonschema", - SchemaVer.Full(1, 0, 0) - ) - // Loosely adapted from Enrich#localEnrichmentConfigsExtractor - val directory = Option(path.toFile.listFiles).fold(List.empty[File])(_.toList) - val configs = directory - .filter(_.getName.endsWith(".json")) - .map(scala.io.Source.fromFile(_).mkString) - .map(JsonUtils.extractJson).sequence[EitherS, Json] - .map(jsonConfigs => SelfDescribingData[Json](schemaKey, Json.fromValues(jsonConfigs)).asJson) - .flatMap { jsonConfig => - EnrichmentRegistry.parse(jsonConfig, igluClient, localMode = false) - .leftMap(_.toList.mkString("; ")).toEither - } - val scripts = directory - .filter(_.getName.endsWith(".js")) - .map(scala.io.Source.fromFile(_).mkString) - .map(EnrichmentConf.JavascriptScriptConf(schemaKey, _)) - configs.map(scripts ::: _) - } -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala new file mode 100644 index 0000000..1f1659b --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala @@ -0,0 +1,266 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.micro + +import cats.data.EitherT +import cats.effect.IO +import cats.implicits._ +import com.monovore.decline.Opts +import com.snowplowanalytics.iglu.client.IgluCirceClient +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig +import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} +import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} +import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf +import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig} +import fs2.io.file.{Files, Path => FS2Path} +import io.circe.config.syntax.CirceConfigOps +import io.circe.generic.semiauto.deriveDecoder +import io.circe.syntax.EncoderOps +import io.circe.{Decoder, Json, JsonObject} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.net.URI +import java.nio.file.{Path, Paths} + +object Configuration { + + object Cli { + final case class Config(collector: Option[Path], iglu: Option[Path], outputEnrichedTsv: Boolean) + + private val collector = Opts.option[Path]("collector-config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone + private val iglu = Opts.option[Path]("iglu", "Configuration file for Iglu Client", "i", "iglu.json").orNone + private val outputEnrichedTsv = Opts.flag("output-tsv", "Print events in TSV format to standard output", "t").orFalse + + val config: Opts[Config] = (collector, iglu, outputEnrichedTsv).mapN(Config.apply) + } + + + object EnvironmentVariables { + val igluRegistryUrl = "MICRO_IGLU_REGISTRY_URL" + val igluApiKey = "MICRO_IGLU_API_KEY" + val sslCertificatePassword = "MICRO_SSL_CERT_PASSWORD" + } + + final case class DummySinkConfig() + + type SinkConfig = DummySinkConfig + implicit val dec: Decoder[DummySinkConfig] = Decoder.instance(_ => Right(DummySinkConfig())) + + final case class MicroConfig(collector: CollectorConfig[SinkConfig], + iglu: IgluResources, + enrichmentsConfig: List[EnrichmentConf], + adaptersSchemas: AdaptersSchemas, + outputEnrichedTsv: Boolean) + + final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas) + + final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO]) + + implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + def load(): Opts[EitherT[IO, String, MicroConfig]] = { + Cli.config.map { cliConfig => + for { + collectorConfig <- loadCollectorConfig(cliConfig.collector) + igluResources <- loadIgluResources(cliConfig.iglu) + enrichmentsConfig <- loadEnrichmentConfig(igluResources.client) + adaptersSchemas <- loadAdaptersSchemas() + } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv) + } + } + + private def loadCollectorConfig(path: Option[Path]): EitherT[IO, String, CollectorConfig[SinkConfig]] = { + val resolveOrder = (config: TypesafeConfig) => + namespaced(ConfigFactory.load(namespaced(config.withFallback(namespaced(ConfigFactory.parseResources("collector-micro.conf")))))) + + loadConfig[CollectorConfig[SinkConfig]](path, resolveOrder) + } + + private def loadIgluResources(path: Option[Path]): EitherT[IO, String, IgluResources] = { + val resolveOrder = (config: TypesafeConfig) => + config.withFallback(ConfigFactory.parseResources("default-iglu-resolver.conf")) + + loadConfig[ResolverConfig](path, resolveOrder) + .flatMap(buildIgluResources) + } + + private def loadEnrichmentConfig(igluClient: IgluCirceClient[IO]): EitherT[IO, String, List[EnrichmentConf]] = { + Option(getClass.getResource("/enrichments")) match { + case Some(definedEnrichments) => + val path = Paths.get(definedEnrichments.toURI) + for { + asJson <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".json") + asHocon <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".hocon") + asJSScripts <- loadJSScripts(path) + } yield asJson ::: asHocon ::: asJSScripts + case None => + EitherT.rightT[IO, String](List.empty) + } + } + + private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = { + val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config) + + //It's not configurable in micro, we load it from reference.conf provided by enrich + loadConfig[AdaptersSchemas](path = None, resolveOrder) + } + + private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] = + for { + resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show) + completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry()) + client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize)) + } yield IgluResources(resolver, client) + + private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path, + igluClient: IgluCirceClient[IO], + fileType: String): EitherT[IO, String, List[EnrichmentConf]] = { + listAvailableEnrichments(enrichmentsDirectory, fileType) + .flatMap(loadEnrichmentsAsJsons) + .map(asSDD) + .flatMap(parseEnrichments(igluClient)) + } + + private def loadJSScripts(enrichmentsDirectory: Path): EitherT[IO, String, List[EnrichmentConf]] = EitherT.right { + listFiles(enrichmentsDirectory, fileType = ".js") + .flatMap { scripts => + scripts.traverse(buildJSConfig) + } + } + + private def buildJSConfig(script: FS2Path): IO[EnrichmentConf.JavascriptScriptConf] = { + val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "javascript_script_config", "jsonschema", SchemaVer.Full(1, 0, 0)) + Files[IO] + .readUtf8Lines(script) + .compile + .toList + .map(lines => EnrichmentConf.JavascriptScriptConf(schemaKey, lines.mkString("\n"), JsonObject.empty)) + } + + private def listAvailableEnrichments(enrichmentsDirectory: Path, fileType: String) = { + listFiles(enrichmentsDirectory, fileType) + .attemptT + .leftMap(e => show"Cannot list ${enrichmentsDirectory.toAbsolutePath.toString} directory with JSON: ${e.getMessage}") + } + + private def listFiles(path: Path, fileType: String): IO[List[FS2Path]] = { + Files[IO].list(fs2.io.file.Path.fromNioPath(path)) + .filter(path => path.toString.endsWith(fileType)) + .compile + .toList + .flatTap(files => logger.info(s"Files with extension: '$fileType' found in $path: ${files.mkString("[", ", ", "]")}")) + } + + private def loadEnrichmentsAsJsons(enrichments: List[FS2Path]): EitherT[IO, String, List[Json]] = { + enrichments.traverse { enrichmentPath => + loadConfig[Json](Some(enrichmentPath.toNioPath), identity) + } + } + + private def asSDD(jsons: List[Json]): SelfDescribingData[Json] = { + val schema = SchemaKey("com.snowplowanalytics.snowplow", "enrichments", "jsonschema", SchemaVer.Full(1, 0, 0)) + SelfDescribingData(schema, Json.arr(jsons: _*)) + } + + private def parseEnrichments(igluClient: IgluCirceClient[IO])(sdd: SelfDescribingData[Json]): EitherT[IO, String, List[EnrichmentConf]] = + EitherT { + EnrichmentRegistry + .parse[IO](sdd.asJson, igluClient, localMode = false, registryLookup = JavaNetRegistryLookup.ioLookupInstance[IO]) + .map(_.toEither) + }.leftMap { x => + show"Cannot decode enrichments - ${x.mkString_(", ")}" + } + + private def readIgluExtraRegistry(): Option[Registry.Http] = { + sys.env.get(EnvironmentVariables.igluRegistryUrl).map { registry => + val uri = URI.create(registry) + Registry.Http( + Registry.Config(s"Custom ($registry)", 0, List.empty), + Registry.HttpConnection(uri, sys.env.get(EnvironmentVariables.igluApiKey)) + ) + } + } + + private def loadConfig[A: Decoder](path: Option[Path], + load: TypesafeConfig => TypesafeConfig): EitherT[IO, String, A] = EitherT { + IO { + for { + config <- Either.catchNonFatal(handleInputPath(path)).leftMap(_.getMessage) + config <- Either.catchNonFatal(config.resolve()).leftMap(_.getMessage) + config <- Either.catchNonFatal(load(config)).leftMap(_.getMessage) + parsed <- config.as[A].leftMap(_.show) + } yield parsed + } + } + + private def handleInputPath(path: Option[Path]): TypesafeConfig = { + path match { + case Some(definedPath) => + //Fail when provided file doesn't exist + ConfigFactory.parseFile(definedPath.toFile, ConfigParseOptions.defaults().setAllowMissing(false)) + case None => ConfigFactory.empty() + } + } + + private def namespaced(config: TypesafeConfig): TypesafeConfig = { + val namespace = "collector" + if (config.hasPath(namespace)) + config.getConfig(namespace).withFallback(config.withoutPath(namespace)) + else + config + } + + implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show)) + + implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = + deriveDecoder[AdaptersSchemas] + implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] = + deriveDecoder[EnrichAdaptersSchemas] + implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] = + deriveDecoder[CallrailSchemas] + implicit val cloudfrontAccessLogSchemasDecoder: Decoder[CloudfrontAccessLogSchemas] = + deriveDecoder[CloudfrontAccessLogSchemas] + implicit val googleAnalyticsSchemasDecoder: Decoder[GoogleAnalyticsSchemas] = + deriveDecoder[GoogleAnalyticsSchemas] + implicit val hubspotSchemasDecoder: Decoder[HubspotSchemas] = + deriveDecoder[HubspotSchemas] + implicit val mailchimpSchemasDecoder: Decoder[MailchimpSchemas] = + deriveDecoder[MailchimpSchemas] + implicit val mailgunSchemasDecoder: Decoder[MailgunSchemas] = + deriveDecoder[MailgunSchemas] + implicit val mandrillSchemasDecoder: Decoder[MandrillSchemas] = + deriveDecoder[MandrillSchemas] + implicit val marketoSchemasDecoder: Decoder[MarketoSchemas] = + deriveDecoder[MarketoSchemas] + implicit val olarkSchemasDecoder: Decoder[OlarkSchemas] = + deriveDecoder[OlarkSchemas] + implicit val pagerdutySchemasDecoder: Decoder[PagerdutySchemas] = + deriveDecoder[PagerdutySchemas] + implicit val pingdomSchemasDecoder: Decoder[PingdomSchemas] = + deriveDecoder[PingdomSchemas] + implicit val sendgridSchemasDecoder: Decoder[SendgridSchemas] = + deriveDecoder[SendgridSchemas] + implicit val statusgatorSchemasDecoder: Decoder[StatusGatorSchemas] = + deriveDecoder[StatusGatorSchemas] + implicit val unbounceSchemasDecoder: Decoder[UnbounceSchemas] = + deriveDecoder[UnbounceSchemas] + implicit val urbanAirshipSchemasDecoder: Decoder[UrbanAirshipSchemas] = + deriveDecoder[UrbanAirshipSchemas] + implicit val veroSchemasDecoder: Decoder[VeroSchemas] = + deriveDecoder[VeroSchemas] + +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala deleted file mode 100644 index cdcc992..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.micro - -import cats.Id -import cats.effect.Clock -import java.util.concurrent.TimeUnit - -object IdImplicits { - - implicit val clockProvider: Clock[Id] = new Clock[Id] { - final def realTime(unit: TimeUnit): Id[Long] = - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - final def monotonic(unit: TimeUnit): Id[Long] = - unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) - } - -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala deleted file mode 100644 index c3fbff1..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.micro - -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route -import akka.http.scaladsl.model.StatusCodes.NotFound -import cats.Id -import io.circe.generic.auto._ - -import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} - -import IdImplicits._ -import CirceSupport._ - -class IgluService(resolver: Resolver[Id]) { - - def get(vendor: String, name: String, versionStr: String): Route = - SchemaVer.parseFull(versionStr) match { - case Right(version) => - val key = SchemaKey(vendor, name, "jsonschema", version) - resolver.lookupSchema(key) match { - case Right(json) => complete(json) - case Left(error) => complete(NotFound, error) - } - case Left(_) => reject - } - -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala index e1b374b..0e8f570 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala @@ -12,96 +12,15 @@ */ package com.snowplowanalytics.snowplow.micro -import java.io.File - -import org.slf4j.LoggerFactory - -import scala.sys.process._ - -import akka.actor.ActorSystem -import akka.http.scaladsl.{ConnectionContext, Http} - -import cats.Id - -import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorSinks - -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} -import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution} - -import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig -import com.snowplowanalytics.snowplow.micro.IdImplicits._ - -/** Read the configuration and instantiate Snowplow Micro, - * which acts as a `Collector` and has an in-memory sink - * holding the valid and invalid events. - * It offers an HTTP endpoint to query this sink. - */ -object Main { - lazy val logger = LoggerFactory.getLogger(getClass()) - - def main(args: Array[String]): Unit = { - val config = ConfigHelper.parseConfig(args) - run(config) - } - - def setupEnrichments(configs: List[EnrichmentConf]): EnrichmentRegistry[Id] = { - configs.flatMap(_.filesToCache).foreach { case (uri, location) => - logger.info(s"Downloading ${uri}...") - uri.toURL #> new File(location) !! - } - - val enrichmentRegistry = EnrichmentRegistry.build[Id](configs, BlockerF.noop, ShiftExecution.noop).value match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while enabling enrichments: $e.") - } - - val loadedEnrichments = enrichmentRegistry.productIterator.toList.collect { - case Some(e: Enrichment) => e.getClass.getSimpleName - } - if (loadedEnrichments.nonEmpty) { - logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}") - } else { - logger.info(s"No enrichments enabled") - } - - enrichmentRegistry - } - - /** Create the in-memory sink, - * get the endpoints for both the collector and to query Snowplow Micro, - * and start the HTTP server. - */ - def run(config: MicroConfig): Unit = { - implicit val system = ActorSystem.create("snowplow-micro", config.akkaConfig) - implicit val executionContext = system.dispatcher - - val enrichmentRegistry = setupEnrichments(config.enrichmentConfigs) - val sinks = CollectorSinks( - MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv), - MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv) - ) - val igluService = new IgluService(config.igluResolver) - - val routes = Routing.getMicroRoutes(config.collectorConfig, sinks, igluService) - logger.info("UI available at /micro/ui") - - Http() - .newServerAt(config.collectorConfig.interface, config.collectorConfig.port) - .bind(routes) - .foreach { binding => - logger.info(s"REST interface bound to ${binding.localAddress}") - } - - config.sslContext.foreach { sslContext => - Http() - .newServerAt(config.collectorConfig.interface, config.collectorConfig.ssl.port) - .enableHttps(ConnectionContext.httpsServer(sslContext)) - .bind(routes) - .foreach { binding => - logger.info(s"HTTPS REST interface bound to ${binding.localAddress}") - } - } - } +import cats.effect.{ExitCode, IO} +import com.monovore.decline.Opts +import com.monovore.decline.effect.CommandIOApp + +object Main + extends CommandIOApp( + name = s"docker run ${BuildInfo.dockerAlias}", + header = "MICRO", + version = BuildInfo.version + ) { + override def main: Opts[IO[ExitCode]] = Run.run() } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala index a78efd5..ecd44de 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala @@ -12,48 +12,53 @@ */ package com.snowplowanalytics.snowplow.micro +import cats.data.{EitherT, Validated} +import cats.effect.IO import cats.implicits._ -import cats.Id -import cats.data.Validated -import io.circe.syntax._ -import org.joda.time.DateTime -import org.slf4j.LoggerFactory import com.snowplowanalytics.iglu.client.IgluCirceClient +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, EventConverter} +import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations} import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Processor} -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.Sink +import com.snowplowanalytics.snowplow.collector.core.Sink +import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils -import IdImplicits._ -import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations} -import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import io.circe.syntax._ +import org.joda.time.DateTime +import org.slf4j.LoggerFactory /** Sink of the collector that Snowplow Micro is. - * Contains the functions that are called for each tracking event sent - * to the collector endpoint. - * The events are received as `CollectorPayload`s serialized with Thrift. - * For each event it tries to validate it using Common Enrich, - * and then stores the results in-memory in [[ValidationCache]]. - */ -private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enrichmentRegistry: EnrichmentRegistry[Id], outputEnrichedTsv: Boolean) extends Sink { - val MaxBytes = Int.MaxValue - private val processor = Processor(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) + * Contains the functions that are called for each tracking event sent + * to the collector endpoint. + * The events are received as `CollectorPayload`s serialized with Thrift. + * For each event it tries to validate it using Common Enrich, + * and then stores the results in-memory in [[ValidationCache]]. + */ +final class MemorySink(igluClient: IgluCirceClient[IO], + registryLookup: RegistryLookup[IO], + enrichmentRegistry: EnrichmentRegistry[IO], + outputEnrichedTsv: Boolean, + processor: Processor, + adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] { + override val maxBytes = Int.MaxValue private lazy val logger = LoggerFactory.getLogger("EventLog") - /** Function of the [[Sink]] called for all the events received by a collector. */ - override def storeRawEvents(events: List[Array[Byte]], key: String) = { - events.foreach(bytes => processThriftBytes(bytes, igluClient, enrichmentRegistry, processor)) + override def isHealthy: IO[Boolean] = IO.pure(true) + + override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = { + events.traverse(bytes => processThriftBytes(bytes)).void } - private def formatEvent(event: GoodEvent) = + private def formatEvent(event: GoodEvent): String = s"id:${event.event.event_id}" + - event.event.app_id.fold("")(i => s" app_id:$i") + - event.eventType.fold("")(t => s" type:$t") + - event.schema.fold("")(s => s" ($s)") + event.event.app_id.fold("")(i => s" app_id:$i") + + event.eventType.fold("")(t => s" type:$t") + + event.schema.fold("")(s => s" ($s)") - private def formatBadRow(badRow: BadRow) = badRow match { + private def formatBadRow(badRow: BadRow): String = badRow match { case TrackerProtocolViolations(_, Failure.TrackerProtocolViolations(_, _, _, messages), _) => messages.map(_.asJson).toList.mkString case SchemaViolations(_, Failure.SchemaViolations(_, messages), _) => @@ -64,73 +69,76 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri } /** Deserialize Thrift bytes into `CollectorPayload`s, - * validate them and store the result in [[ValidationCache]]. - * A `CollectorPayload` can contain several events. - */ - private[micro] def processThriftBytes( - thriftBytes: Array[Byte], - igluClient: IgluCirceClient[Id], - enrichmentRegistry: EnrichmentRegistry[Id], - processor: Processor - ): Unit = + * validate them and store the result in [[ValidationCache]]. + * A `CollectorPayload` can contain several events. + */ + private[micro] def processThriftBytes(thriftBytes: Array[Byte]): IO[Unit] = ThriftLoader.toCollectorPayload(thriftBytes, processor) match { case Validated.Valid(maybePayload) => maybePayload match { case Some(collectorPayload) => - new AdapterRegistry().toRawEvents(collectorPayload, igluClient, processor) match { + adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup).flatMap { case Validated.Valid(rawEvents) => - val (goodEvents, badEvents) = rawEvents.toList.foldRight((Nil, Nil) : (List[GoodEvent], List[BadEvent])) { - case (rawEvent, (good, bad)) => - validateEvent(rawEvent, igluClient, enrichmentRegistry, processor) match { + val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) { + case ((good, bad), rawEvent) => + validateEvent(rawEvent).value.map { case Right(goodEvent) => logger.info(s"GOOD ${formatEvent(goodEvent)}") (goodEvent :: good, bad) case Left((errors, badRow)) => val badEvent = - BadEvent( - Some(collectorPayload), - Some(rawEvent), - errors - ) + BadEvent( + Some(collectorPayload), + Some(rawEvent), + errors + ) logger.warn(s"BAD ${formatBadRow(badRow)}") (good, badEvent :: bad) } } - ValidationCache.addToGood(goodEvents) - ValidationCache.addToBad(badEvents) - if (outputEnrichedTsv) { - goodEvents.foreach { event => - println(event.event.toTsv) - } + partitionEvents.map { + case (goodEvents, badEvents) => + ValidationCache.addToGood(goodEvents) + ValidationCache.addToBad(badEvents) + if (outputEnrichedTsv) { + goodEvents.foreach { event => + println(event.event.toTsv) + } + } else () } case Validated.Invalid(badRow) => val bad = BadEvent(Some(collectorPayload), None, List("Error while extracting event(s) from collector payload and validating it/them.", badRow.compact)) logger.warn(s"BAD ${bad.errors.head}") - ValidationCache.addToBad(List(bad)) + IO(ValidationCache.addToBad(List(bad))) } case None => val bad = BadEvent(None, None, List("No payload.")) logger.warn(s"BAD ${bad.errors.head}") - ValidationCache.addToBad(List(bad)) + IO(ValidationCache.addToBad(List(bad))) } case Validated.Invalid(badRows) => val bad = BadEvent(None, None, List("Can't deserialize Thrift bytes.") ++ badRows.toList.map(_.compact)) logger.warn(s"BAD ${bad.errors.head}") - ValidationCache.addToBad(List(bad)) + IO(ValidationCache.addToBad(List(bad))) } /** Validate the raw event using Common Enrich logic, and extract the event type if any, - * the schema if any, and the schemas of the contexts attached to the event if any. - * @return [[GoodEvent]] with the extracted event type, schema and contexts, - * or error if the event couldn't be validated. - */ - private[micro] def validateEvent( - rawEvent: RawEvent, - igluClient: IgluCirceClient[Id], - enrichmentRegistry: EnrichmentRegistry[Id], - processor: Processor - ): Either[(List[String], BadRow), GoodEvent] = - EnrichmentManager.enrichEvent[Id](enrichmentRegistry, igluClient, processor, DateTime.now(), rawEvent, EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), ()) + * the schema if any, and the schemas of the contexts attached to the event if any. + * @return [[GoodEvent]] with the extracted event type, schema and contexts, + * or error if the event couldn't be validated. + */ + private[micro] def validateEvent(rawEvent: RawEvent): EitherT[IO, (List[String], BadRow), GoodEvent] = + EnrichmentManager.enrichEvent[IO]( + enrichmentRegistry, + igluClient, + processor, + DateTime.now(), + rawEvent, + EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), + IO.unit, + registryLookup, + AtomicFields.from(Map.empty) + ) .subflatMap { enriched => EventConverter.fromEnriched(enriched) .leftMap { failure => @@ -138,18 +146,17 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri } .toEither } - .value.bimap( - badRow => (List("Error while validating the event.", badRow.compact), badRow), - enriched => GoodEvent(rawEvent, enriched.event, getEnrichedSchema(enriched), getEnrichedContexts(enriched), enriched) - ) + .bimap( + badRow => (List("Error while validating the event.", badRow.compact), badRow), + enriched => GoodEvent(rawEvent, enriched.event, getEnrichedSchema(enriched), getEnrichedContexts(enriched), enriched) + ) + - private[micro] def getEnrichedSchema(enriched: Event): Option[String] = + private def getEnrichedSchema(enriched: Event): Option[String] = List(enriched.event_vendor, enriched.event_name, enriched.event_format, enriched.event_version) .sequence .map(_.mkString("iglu:", "/", "")) - private[micro] def getEnrichedContexts(enriched: Event): List[String] = + private def getEnrichedContexts(enriched: Event): List[String] = enriched.contexts.data.map(_.schema.toSchemaUri) - - override def shutdown(): Unit = () } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala new file mode 100644 index 0000000..dc049de --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala @@ -0,0 +1,97 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{IO, Resource} +import com.avast.datadog4s.api.Tag +import com.avast.datadog4s.extension.http4s.DatadogMetricsOps +import com.avast.datadog4s.{StatsDMetricFactory, StatsDMetricFactoryConfig} +import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} +import com.snowplowanalytics.snowplow.micro.Configuration.{MicroConfig, SinkConfig} +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.headers.`Strict-Transport-Security` +import org.http4s.server.Server +import org.http4s.server.middleware.{HSTS, Metrics, Timeout, Logger => LoggerMiddleware} +import org.http4s.{HttpApp, HttpRoutes} +import org.typelevel.ci.CIString +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.net.InetSocketAddress +import javax.net.ssl.SSLContext + + +/** Similar to HTTP server builder as in collector but with option to customize SSLContext */ +object MicroHttpServer { + + implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + def build(routes: HttpRoutes[IO], + config: MicroConfig, + sslContext: Option[SSLContext]): Resource[IO, Unit] = + for { + withMetricsMiddleware <- createMetricsMiddleware(routes, config.collector.monitoring.metrics) + _ <- Resource.eval(Logger[IO].info("Building blaze server")) + _ <- buildHTTPServer(withMetricsMiddleware, config) + _ <- sslContext.map(definedSSL => buildHTTPSServer(withMetricsMiddleware, config, definedSSL)).getOrElse(Resource.unit[IO]) + } yield () + + private def buildHTTPServer(routes: HttpRoutes[IO], config: MicroConfig): Resource[IO, Server] = + builder(routes, config.collector) + .bindSocketAddress(new InetSocketAddress(config.collector.port)) + .resource + + private def buildHTTPSServer(routes: HttpRoutes[IO], config: MicroConfig, sslContext: SSLContext): Resource[IO, Server] = + builder(routes, config.collector) + .bindSocketAddress(new InetSocketAddress(config.collector.ssl.port)) + .withSslContext(sslContext) + .resource + + private def builder(routes: HttpRoutes[IO], config: CollectorConfig[SinkConfig]): BlazeServerBuilder[IO] = { + BlazeServerBuilder[IO] + .withHttpApp( + loggerMiddleware(timeoutMiddleware(hstsMiddleware(config.hsts, routes.orNotFound), config.networking), config.debug.http) + ) + .withIdleTimeout(config.networking.idleTimeout) + .withMaxConnections(config.networking.maxConnections) + .withResponseHeaderTimeout(config.networking.responseHeaderTimeout) + .withLengthLimits( + maxRequestLineLen = config.networking.maxRequestLineLength, + maxHeadersLen = config.networking.maxHeadersLength + ) + } + + private def loggerMiddleware(routes: HttpApp[IO], config: CollectorConfig.Debug.Http): HttpApp[IO] = + if (config.enable) { + LoggerMiddleware.httpApp[IO]( + logHeaders = config.logHeaders, + logBody = config.logBody, + redactHeadersWhen = config.redactHeaders.map(CIString(_)).contains(_), + logAction = Some((msg: String) => Logger[IO].debug(msg)) + )(routes) + } else routes + + private def timeoutMiddleware(routes: HttpApp[IO], networking: CollectorConfig.Networking): HttpApp[IO] = + Timeout.httpApp[IO](timeout = networking.responseHeaderTimeout)(routes) + + + private def createMetricsMiddleware(routes: HttpRoutes[IO], + metricsCollectorConfig: CollectorConfig.Metrics): Resource[IO, HttpRoutes[IO]] = + if (metricsCollectorConfig.statsd.enabled) { + val metricsFactory = StatsDMetricFactory.make[IO](createStatsdCollectorConfig(metricsCollectorConfig)) + metricsFactory.evalMap(DatadogMetricsOps.builder[IO](_).useDistributionBasedTimers().build()).map { metricsOps => + Metrics[IO](metricsOps)(routes) + } + } else { + Resource.pure(routes) + } + + private def createStatsdCollectorConfig(metricsCollectorConfig: CollectorConfig.Metrics): StatsDMetricFactoryConfig = { + val server = InetSocketAddress.createUnresolved(metricsCollectorConfig.statsd.hostname, metricsCollectorConfig.statsd.port) + val tags = metricsCollectorConfig.statsd.tags.toVector.map { case (name, value) => Tag.of(name, value) } + StatsDMetricFactoryConfig(Some(metricsCollectorConfig.statsd.prefix), server, defaultTags = tags) + } + + private def hstsMiddleware(hsts: CollectorConfig.HSTS, routes: HttpApp[IO]): HttpApp[IO] = + if (hsts.enable) + HSTS(routes, `Strict-Transport-Security`.unsafeFromDuration(hsts.maxAge)) + else routes +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala index 54811b3..9a2d2d8 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala @@ -1,121 +1,109 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ package com.snowplowanalytics.snowplow.micro -import akka.http.scaladsl.server.{Route, RouteResult} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.model.headers.{`Access-Control-Allow-Methods`, `Access-Control-Allow-Headers`} -import akka.http.scaladsl.model.{HttpMethods, StatusCodes} +import cats.effect.IO +import com.snowplowanalytics.iglu.client.ClientError.ResolutionError +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload +import com.snowplowanalytics.snowplow.micro.Routing._ +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.syntax.EncoderOps +import io.circe.{Decoder, Encoder} +import org.apache.http.NameValuePair +import org.http4s.circe.CirceEntityDecoder._ +import org.http4s.circe.CirceEntityEncoder._ +import org.http4s.dsl.Http4sDsl +import org.http4s.{HttpRoutes, Response, StaticFile} +import org.joda.time.DateTime -import io.circe.generic.auto._ +final class Routing(igluResolver: Resolver[IO]) + (implicit lookup: RegistryLookup[IO]) extends Http4sDsl[IO] { -import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorConfig, CollectorSinks} -import com.snowplowanalytics.snowplow.collectors.scalastream.{CollectorRoute, CollectorService, HealthService} - -import scala.concurrent.ExecutionContext - -import CirceSupport._ - -/** Contain definitions of the routes (endpoints) for Snowplow Micro. - * Make the link between Snowplow Micro endpoints and the functions called. - * Snowplow Micro has 2 types of endpoints: - * - to receive tracking events; - * - to query the validated events. - * - * More information about an Akka HTTP routes can be found here: - * https://doc.akka.io/docs/akka-http/current/routing-dsl/routes.html. - */ -private[micro] object Routing { - - /** Create `Route` for Snowplow Micro, with the endpoints of the collector to receive tracking events - * and the endpoints to query the validated events. - */ - def getMicroRoutes( - collectorConf: CollectorConfig, - collectorSinks: CollectorSinks, - igluService: IgluService - )(implicit ec: ExecutionContext): Route = { - val c = new CollectorService(collectorConf, collectorSinks, buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) - - val health = new HealthService.Settable { - toHealthy() - } - val collectorRoutes = new CollectorRoute { - override def collectorService = c - override def healthService = health - }.collectorRoute - - withCors(c) { - pathPrefix("micro") { - (get | post) { - path("all") { - complete(ValidationCache.getSummary()) - } ~ path("reset") { - ValidationCache.reset() - complete(ValidationCache.getSummary()) - } - } ~ get { - path("good") { - complete(ValidationCache.filterGood(FiltersGood(None, None, None, None))) - } ~ path("bad") { - complete(ValidationCache.filterBad(FiltersBad(None, None, None))) - } ~ pathPrefix("ui") { - pathEndOrSingleSlash { - getFromResource("ui/index.html") - } ~ path("errors") { - getFromResource("ui/errors.html") - } ~ getFromResourceDirectory("ui") ~ getFromResource("ui/404.html") + val value: HttpRoutes[IO] = HttpRoutes.of[IO] { + case request@method -> "micro" /: path => + (method, path.segments.head.encoded) match { + case (POST | GET, "all") => + Ok(ValidationCache.getSummary()) + case (POST | GET, "reset") => + ValidationCache.reset() + Ok(ValidationCache.getSummary()) + case (GET, "good") => + Ok(ValidationCache.filterGood(FiltersGood(None, None, None, None))) + case (POST, "good") => + request.as[FiltersGood].flatMap { filters => + Ok(ValidationCache.filterGood(filters).asJson) } - } ~ post { - path("good") { - entity(as[FiltersGood]) { filters => - complete(ValidationCache.filterGood(filters)) - } - } ~ path("bad") { - entity(as[FiltersBad]) { filters => - complete(ValidationCache.filterBad(filters)) - } + case (GET, "bad") => + Ok(ValidationCache.filterBad(FiltersBad(None, None, None))) + case (POST, "bad") => + request.as[FiltersBad].flatMap { filters => + Ok(ValidationCache.filterBad(filters)) } - } ~ options { - complete(StatusCodes.OK) - } ~ pathPrefix("iglu") { - path(Segment / Segment / "jsonschema" / Segment) { - igluService.get(_, _, _) - } ~ { - complete(StatusCodes.NotFound, "Schema lookup should be in format iglu/{vendor}/{schemaName}/jsonschema/{model}-{revision}-{addition}") + case (GET, "iglu") => + path match { + case Path.empty / "iglu" / vendor / name / "jsonschema" / versionVar => + lookupSchema(vendor, name, versionVar) + case _ => + NotFound("Schema lookup should be in format iglu/{vendor}/{schemaName}/jsonschema/{model}-{revision}-{addition}") } - } ~ { - complete(StatusCodes.NotFound, "Path for micro has to be one of: /all /good /bad /reset /iglu") - } + case (GET, "ui") => + handleUIPath(path) + case _ => + NotFound("Path for micro has to be one of: /all /good /bad /reset /iglu") } - } ~ collectorRoutes } - /** Wrap a Route with CORS header handling. - * - * Reuses the implementation used by the stream collector - */ - private def withCors(c: CollectorService)(route: Route)(implicit ec: ExecutionContext): Route = - extractRequest { request => requestContext => - route(requestContext).map { - case RouteResult.Complete(response) => - val r = response.withHeaders(List( - `Access-Control-Allow-Methods`(List(HttpMethods.POST, HttpMethods.GET, HttpMethods.OPTIONS)), - c.accessControlAllowOriginHeader(request), - `Access-Control-Allow-Headers`("Content-Type") - )) - RouteResult.Complete(r) - case other => other - } + private def handleUIPath(path: Path): IO[Response[IO]] = { + path match { + case Path.empty / "ui" | Path.empty / "ui" / "/" => + resource("ui/index.html") + case Path.empty / "ui" / "errors" => + resource("ui/errors.html") + case other => + resource(other.renderString) + } + } + + private def resource(path: String): IO[Response[IO]] = { + StaticFile.fromResource[IO](path) + .getOrElseF(NotFound()) + } + + private def lookupSchema(vendor: String, name: String, versionVar: String): IO[Response[IO]] = { + SchemaVer.parseFull(versionVar) match { + case Right(version) => + val key = SchemaKey(vendor, name, "jsonschema", version) + igluResolver.lookupSchema(key).flatMap { + case Right(json) => Ok(json) + case Left(error) => NotFound(error) + } + case Left(_) => NotFound("Schema lookup should be in format iglu/{vendor}/{schemaName}/jsonschema/{model}-{revision}-{addition}") } + } } + +object Routing { + + implicit val dateTimeEncoder: Encoder[DateTime] = + Encoder[String].contramap(_.toString) + + implicit val nameValuePairEncoder: Encoder[NameValuePair] = + Encoder[String].contramap(kv => s"${kv.getName}=${kv.getValue}") + + implicit val vs: Encoder[ValidationSummary] = deriveEncoder + implicit val ge: Encoder[GoodEvent] = deriveEncoder + implicit val rwe: Encoder[RawEvent] = deriveEncoder + implicit val cp: Encoder[CollectorPayload] = deriveEncoder + implicit val cpa: Encoder[CollectorPayload.Api] = deriveEncoder + implicit val cps: Encoder[CollectorPayload.Source] = deriveEncoder + implicit val cpc: Encoder[CollectorPayload.Context] = deriveEncoder + implicit val e: Encoder[Event] = deriveEncoder + implicit val be: Encoder[BadEvent] = deriveEncoder + implicit val re: Encoder[ResolutionError] = deriveEncoder + + implicit val fg: Decoder[FiltersGood] = deriveDecoder + implicit val fb: Decoder[FiltersBad] = deriveDecoder +} \ No newline at end of file diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala new file mode 100644 index 0000000..eec7113 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -0,0 +1,157 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.micro + +import cats.data.EitherT +import cats.effect.{ExitCode, IO, Resource} +import cats.implicits._ +import com.monovore.decline.Opts +import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup +import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.collector.core._ +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} +import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} +import com.snowplowanalytics.snowplow.micro.Configuration.MicroConfig +import org.http4s.ember.client.EmberClientBuilder +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.io.File +import java.security.{KeyStore, SecureRandom} +import java.util.concurrent.Executors +import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} +import scala.concurrent.ExecutionContext +import scala.sys.process._ + +object Run { + + implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + def run(): Opts[IO[ExitCode]] = { + Configuration.load().map { configuration => + handleAppErrors { + configuration + .semiflatMap { validMicroConfig => + buildEnvironment(validMicroConfig) + .use(_ => IO.never) + .as(ExitCode.Success) + } + } + } + } + + private def buildEnvironment(config: MicroConfig): Resource[IO, Unit] = { + for { + sslContext <- Resource.eval(setupSSLContext()) + enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) + badProcessor = Processor(BuildInfo.name, BuildInfo.version) + adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas) + lookup = JavaNetRegistryLookup.ioLookupInstance[IO] + sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) + collectorService = new Service[IO]( + config.collector, + Sinks(sink, sink), + BuildInfo + ) + collectorRoutes = new Routes[IO]( + config.collector.enableDefaultRedirect, + config.collector.rootResponse.enabled, + config.collector.crossDomain.enabled, + config.collector.networking.bodyReadTimeout, + collectorService + ).value + + miniRoutes = new Routing(config.iglu.resolver)(lookup).value + allRoutes = miniRoutes <+> collectorRoutes + _ <- MicroHttpServer.build(allRoutes, config, sslContext) + } yield () + } + + private def setupSSLContext(): IO[Option[SSLContext]] = IO { + sys.env.get(Configuration.EnvironmentVariables.sslCertificatePassword).map { password => + // Adapted from https://doc.akka.io/docs/akka-http/current/server-side/server-https-support.html. + // We could use SSLContext.getDefault instead of all of this, but then we would need to + // force the user to add arcane -D flags when running Micro, which is not the best experience. + val keystore = KeyStore.getInstance("PKCS12") + val certificateFile = getClass.getClassLoader.getResourceAsStream("ssl-certificate.p12") + keystore.load(certificateFile, password.toCharArray) + + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(keystore, password.toCharArray) + + val trustManagerFactory = TrustManagerFactory.getInstance("SunX509") + trustManagerFactory.init(keystore) + + val context = SSLContext.getInstance("TLS") + context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) + context + } + } + + private def buildEnrichmentRegistry(configs: List[EnrichmentConf]): Resource[IO, EnrichmentRegistry[IO]] = { + for { + _ <- Resource.eval(downloadAssets(configs)) + shift <- ShiftExecution.ofSingleThread[IO] + httpClient <- EmberClientBuilder.default[IO].build.map(HttpClient.fromHttp4sClient[IO]) + blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool) + enrichmentRegistry <- Resource.eval(EnrichmentRegistry.build[IO](configs, shift, httpClient, blockingEC) + .leftMap(error => new IllegalArgumentException(s"can't build EnrichmentRegistry: $error")) + .value.rethrow) + _ <- Resource.eval { + val loadedEnrichments = enrichmentRegistry.productIterator.toList.collect { + case Some(e: Enrichment) => e.getClass.getSimpleName + } + if (loadedEnrichments.nonEmpty) { + logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}") + } else { + logger.info(s"No enrichments enabled") + } + } + + } yield enrichmentRegistry + } + + private def downloadAssets(configs: List[EnrichmentConf]): IO[Unit] = { + configs + .flatMap(_.filesToCache) + .traverse_ { case (uri, location) => + logger.info(s"Downloading $uri...") *> IO(uri.toURL #> new File(location) !!) + } + } + + private def handleAppErrors(appOutput: EitherT[IO, String, ExitCode]): IO[ExitCode] = { + appOutput + .leftSemiflatMap { error => + logger.error(error).as(ExitCode.Error) + } + .merge + .handleErrorWith { exception => + logger.error(exception)("Exiting") >> + prettyLogException(exception).as(ExitCode.Error) + } + } + + private def prettyLogException(e: Throwable): IO[Unit] = { + + def logCause(e: Throwable): IO[Unit] = + Option(e.getCause) match { + case Some(e) => logger.error(s"caused by: ${e.getMessage}") >> logCause(e) + case None => IO.unit + } + + logger.error(e.getMessage) >> logCause(e) + } + + +} diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala index 58b1b1f..ff511ec 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala @@ -12,12 +12,33 @@ */ package com.snowplowanalytics.snowplow.micro +import cats.effect.IO +import cats.effect.testing.specs2.CatsEffect +import com.monovore.decline.Command +import com.snowplowanalytics.snowplow.micro.Configuration.MicroConfig import org.specs2.mutable.Specification -class ConfigHelperSpec extends Specification { - "ConfigHelper" >> { - "will produce a valid parsed collector config if `--collector-config` is not present" >> { - ConfigHelper.parseConfig(Array()) must not(throwA[Exception]) +class ConfigHelperSpec extends Specification with CatsEffect { + "Configuration loader should work when" >> { + "no custom args are provided and only defaults are used" >> { + load(args = List.empty) + .map { result => + result must beRight[MicroConfig].like { + case config => + config.collector.port must beEqualTo(9090) + config.collector.ssl.enable must beFalse + config.collector.ssl.port must beEqualTo(9543) + + config.enrichmentsConfig.isEmpty must beTrue + config.iglu.resolver.repos.map(_.config.name) must containTheSameElementsAs(List("Iglu Central", "Iglu Central - Mirror 01")) + + config.outputEnrichedTsv must beFalse + } + } } } + + private def load(args: List[String]): IO[Either[String, MicroConfig]] = { + Command("test-app", "test")(Configuration.load()).parse(args).right.get.value + } } diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala index 18c707c..d500be0 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala @@ -12,136 +12,164 @@ */ package com.snowplowanalytics.snowplow.micro -import org.specs2.mutable.Specification - -import cats.Id - +import cats.effect.testing.specs2.CatsResource +import cats.effect.{IO, Resource} import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.client.resolver.registries.Registry - +import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} +import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import org.specs2.mutable.SpecificationLike -import com.snowplowanalytics.snowplow.badrows.Processor +class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike { -class MemorySinkSpec extends Specification { import events._ - private val igluClient = IgluCirceClient.fromResolver[Id](Resolver(List(Registry.IgluCentral), None), 500) - private val enrichmentRegistry = new EnrichmentRegistry[Id]() - private val processor = Processor(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) - private val sink = MemorySink(igluClient, enrichmentRegistry, outputEnrichedTsv = false) + override val resource: Resource[IO, MemorySink] = Resource.eval(createSink()) sequential "processThriftBytes" >> { - "should add a BadEvent to the cache if the array of bytes is not a valid Thrift payload" >> { + "should add a BadEvent to the cache if the array of bytes is not a valid Thrift payload" >> withResource { sink => ValidationCache.reset() val bytes = Array(1, 3, 5, 7).map(_.toByte) - sink.processThriftBytes(bytes, igluClient, enrichmentRegistry, processor) - ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Can't deserialize Thrift bytes")) => ok } - ValidationCache.filterGood().size must_== 0 + sink.processThriftBytes(bytes).map { _ => + ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Can't deserialize Thrift bytes")) => ok } + ValidationCache.filterGood().size must beEqualTo(0) + } } - "should add a BadEvent to the cache if RawEvent(s) can't be extracted from the CollectorPayload" >> { + "should add a BadEvent to the cache if RawEvent(s) can't be extracted from the CollectorPayload" >> withResource { sink => ValidationCache.reset() val bytes = buildThriftBytesBadCollectorPayload() - sink.processThriftBytes(bytes, igluClient, enrichmentRegistry, processor) - ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while extracting event(s) from collector payload")) => ok } - ValidationCache.filterGood().size must_== 0 + sink.processThriftBytes(bytes).map { _ => + ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while extracting event(s) from collector payload")) => ok } + ValidationCache.filterGood().size must beEqualTo(0) + } } - "should add a GoodEvent and a BadEvent to the cache for a CollectorPayload containing both" >> { + "should add a GoodEvent and a BadEvent to the cache for a CollectorPayload containing both" >> withResource { sink => ValidationCache.reset() val bytes = buildThriftBytes1Good1Bad() - sink.processThriftBytes(bytes, igluClient, enrichmentRegistry, processor) - ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while validating the event")) => ok } - ValidationCache.filterGood().size must_== 1 + sink.processThriftBytes(bytes).map { _ => + ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while validating the event")) => ok } + ValidationCache.filterGood().size must beEqualTo(1) + } } } "validateEvent" >> { - "should fail if the timestamp is not valid" >> { + "should fail if the timestamp is not valid" >> withResource { sink => val raw = buildRawEvent() val withoutTimestamp = raw.copy(context = raw.context.copy(timestamp = None)) val expected = "Error while validating the event" - sink.validateEvent(withoutTimestamp, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(withoutTimestamp).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail if the event type parameter is not set" >> { + "should fail if the event type parameter is not set" >> withResource { sink => val raw = buildRawEvent() val withoutEvent = raw.copy(parameters = raw.parameters - "e") val expected = "Error while validating the event" - sink.validateEvent(withoutEvent, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(withoutEvent).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail for an invalid unstructured event" >> { + "should fail for an invalid unstructured event" >> withResource { sink => val raw = buildRawEvent(Some(buildUnstruct(sdjInvalid))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail if the event has an invalid context" >> { + "should fail if the event has an invalid context" >> withResource { sink => val raw = buildRawEvent(None, Some(buildContexts(List(sdjInvalid)))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail for a unstructured event with an unknown schema" >> { + "should fail for a unstructured event with an unknown schema" >> withResource { sink => val raw = buildRawEvent(Some(buildUnstruct(sdjDoesNotExist))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail if the event has a context with an unknown schema" >> { + "should fail if the event has a context with an unknown schema" >> withResource { sink => val raw = buildRawEvent(None, Some(buildContexts(List(sdjDoesNotExist)))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "extract the type of an event" >> { + "extract the type of an event" >> withResource { sink => val raw = buildRawEvent() val expected = "page_ping" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beRight.like { - case GoodEvent(_, typE, _, _, _) if typE == Some(expected) => ok - case GoodEvent(_, typE, _, _, _) => ko(s"extracted type $typE isn't $expected") + sink.validateEvent(raw).value.map { result => + result must beRight.like { + case GoodEvent(_, typE, _, _, _) if typE == Some(expected) => ok + case GoodEvent(_, typE, _, _, _) => ko(s"extracted type $typE isn't $expected") + } } } - "should extract the schema of an unstructured event" >> { + "should extract the schema of an unstructured event" >> withResource { sink => val raw = buildRawEvent(Some(buildUnstruct(sdjLinkClick))) val expected = schemaLinkClick - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beRight.like { - case GoodEvent(_, _, schema, _, _) if schema == Some(expected) => ok - case GoodEvent(_, _, schema, _, _) => ko(s"extracted schema $schema isn't $expected") + sink.validateEvent(raw).value.map { result => + result must beRight.like { + case GoodEvent(_, _, schema, _, _) if schema == Some(expected) => ok + case GoodEvent(_, _, schema, _, _) => ko(s"extracted schema $schema isn't $expected") + } } } - "should extract the contexts of an event" >> { + "should extract the contexts of an event" >> withResource { sink => val raw = buildRawEvent(None, Some(buildContexts(List(sdjLinkClick, sdjMobileContext)))) val expected = List(schemaLinkClick, schemaMobileContext) - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beRight.like { - case GoodEvent(_, _, _, contexts, _) if contexts == expected => ok - case GoodEvent(_, _, _, contexts, _) => ko(s"extracted contexts $contexts isn't $expected") + sink.validateEvent(raw).value.map { result => + result must beRight.like { + case GoodEvent(_, _, _, contexts, _) if contexts == expected => ok + case GoodEvent(_, _, _, contexts, _) => ko(s"extracted contexts $contexts isn't $expected") + } } } } + + private def createSink(): IO[MemorySink] = { + for { + igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500) + enrichmentRegistry = new EnrichmentRegistry[IO]() + processor = Processor(BuildInfo.name, BuildInfo.version) + adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas) + lookup = JavaNetRegistryLookup.ioLookupInstance[IO] + } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry) + } + } diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/MicroApiSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/MicroApiSpec.scala new file mode 100644 index 0000000..1a76055 --- /dev/null +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/MicroApiSpec.scala @@ -0,0 +1,85 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.IO +import cats.effect.kernel.Resource +import cats.effect.testing.specs2.CatsEffect +import io.circe.Json +import org.http4s.Method.GET +import org.http4s.Request +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.circe.CirceEntityCodec.circeEntityDecoder +import org.http4s.client.Client +import org.http4s.implicits.http4sLiteralsSyntax +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterEach + +import scala.concurrent.duration.{Duration, DurationInt} + +class MicroApiSpec extends Specification with CatsEffect with BeforeAfterEach { + sequential + + override protected val Timeout: Duration = 1.minute + + override protected def before: Unit = ValidationCache.reset() + + override protected def after: Unit = ValidationCache.reset() + + "Micro should accept good data" in { + setup().use { client => + for { + _ <- client.run(Request(GET, uri"http://localhost:9090/i?e=pp&p=web&tv=lol")).use_ + all <- client.run(Request(GET, uri"http://localhost:9090/micro/all")).use(_.as[Json]) + good <- client.run(Request(GET, uri"http://localhost:9090/micro/good")).use(_.as[Json]) + } yield { + + all.noSpaces must beEqualTo("""{"total":1,"good":1,"bad":0}""") + good.isArray must beTrue + good.asArray.map(_.length).get must beEqualTo(1) + + val firstEvent = good.hcursor.downN(0) + firstEvent.downField("eventType").as[String] must beRight("page_ping") + firstEvent.downField("schema").as[String] must beRight("iglu:com.snowplowanalytics.snowplow/page_ping/jsonschema/1-0-0") + firstEvent.downField("event").downField("platform").as[String] must beRight("web") + firstEvent.downField("event").downField("v_tracker").as[String] must beRight("lol") + } + } + } + + "Micro should handle bad data" in { + setup().use { client => + for { + _ <- client.run(Request(GET, uri"http://localhost:9090/i?e=pp&p=web&tv=lol&eid=invalidEventId")).use_ + all <- client.run(Request(GET, uri"http://localhost:9090/micro/all")).use(_.as[Json]) + bad <- client.run(Request(GET, uri"http://localhost:9090/micro/bad")).use(_.as[Json]) + } yield { + + bad.isArray must beTrue + bad.asArray.map(_.length).get must beEqualTo(1) + all.noSpaces must beEqualTo("""{"total":1,"good":0,"bad":1}""") + } + } + } + + "Micro should reset stored data" in { + setup().use { client => + for { + _ <- client.run(Request(GET, uri"http://localhost:9090/i?e=pp&p=web&tv=lol")).use_ + _ <- client.run(Request(GET, uri"http://localhost:9090/i?e=pp&p=web&tv=lol&eid=invalidEventId")).use_ + beforeReset <- client.run(Request(GET, uri"http://localhost:9090/micro/all")).use(_.as[Json]) + afterReset <- client.run(Request(GET, uri"http://localhost:9090/micro/reset")).use(_.as[Json]) + } yield { + + beforeReset.noSpaces must beEqualTo("""{"total":2,"good":1,"bad":1}""") + afterReset.noSpaces must beEqualTo("""{"total":0,"good":0,"bad":0}""") + } + } + } + + private def setup(): Resource[IO, Client[IO]] = { + for { + _ <- Main.run(List.empty).background + client <- BlazeClientBuilder.apply[IO].resource + _ <- Resource.sleep[IO](1.seconds) + } yield client + } +} \ No newline at end of file diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala new file mode 100644 index 0000000..beac5b6 --- /dev/null +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala @@ -0,0 +1,154 @@ +package com.snowplowanalytics.snowplow.micro + +import com.snowplowanalytics.snowplow.enrich.common.adapters._ + +object TestAdapterRegistry { + + val adaptersSchemas = AdaptersSchemas( + CallrailSchemas("iglu:com.callrail/call_complete/jsonschema/1-0-2"), + CloudfrontAccessLogSchemas( + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-2", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-3", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-1", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-0", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-4", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-5", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-6" + ), + GoogleAnalyticsSchemas( + "iglu:com.google.analytics.measurement-protocol/page_view/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/screen_view/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/event/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/transaction/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/item/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/social/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/exception/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/timing/jsonschema/1-0-0", + "iglu:com.google.analytics/undocumented/jsonschema/1-0-0", + "iglu:com.google.analytics/private/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/general/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/user/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/session/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/traffic_source/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/system_info/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/link/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/app/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_action/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/content_experiment/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/hit/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/promotion_action/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_custom_dimension/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_custom_metric/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression_list/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression_custom_dimension/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression_custom_metric/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/promotion/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/custom_dimension/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/custom_metric/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/content_group/jsonschema/1-0-0" + ), + HubspotSchemas( + "iglu:com.hubspot/contact_creation/jsonschema/1-0-0", + "iglu:com.hubspot/contact_deletion/jsonschema/1-0-0", + "iglu:com.hubspot/contact_change/jsonschema/1-0-0", + "iglu:com.hubspot/company_creation/jsonschema/1-0-0", + "iglu:com.hubspot/company_deletion/jsonschema/1-0-0", + "iglu:com.hubspot/company_change/jsonschema/1-0-0", + "iglu:com.hubspot/deal_creation/jsonschema/1-0-0", + "iglu:com.hubspot/deal_deletion/jsonschema/1-0-0", + "iglu:com.hubspot/deal_change/jsonschema/1-0-0" + ), + MailchimpSchemas( + "iglu:com.mailchimp/subscribe/jsonschema/1-0-0", + "iglu:com.mailchimp/unsubscribe/jsonschema/1-0-0", + "iglu:com.mailchimp/campaign_sending_status/jsonschema/1-0-0", + "iglu:com.mailchimp/cleaned_email/jsonschema/1-0-0", + "iglu:com.mailchimp/email_address_change/jsonschema/1-0-0", + "iglu:com.mailchimp/profile_update/jsonschema/1-0-0" + ), + MailgunSchemas( + "iglu:com.mailgun/message_bounced/jsonschema/1-0-0", + "iglu:com.mailgun/message_clicked/jsonschema/1-0-0", + "iglu:com.mailgun/message_complained/jsonschema/1-0-0", + "iglu:com.mailgun/message_delivered/jsonschema/1-0-0", + "iglu:com.mailgun/message_dropped/jsonschema/1-0-0", + "iglu:com.mailgun/message_opened/jsonschema/1-0-0", + "iglu:com.mailgun/recipient_unsubscribed/jsonschema/1-0-0" + ), + MandrillSchemas( + "iglu:com.mandrill/message_bounced/jsonschema/1-0-1", + "iglu:com.mandrill/message_clicked/jsonschema/1-0-1", + "iglu:com.mandrill/message_delayed/jsonschema/1-0-1", + "iglu:com.mandrill/message_delivered/jsonschema/1-0-0", + "iglu:com.mandrill/message_marked_as_spam/jsonschema/1-0-1", + "iglu:com.mandrill/message_opened/jsonschema/1-0-1", + "iglu:com.mandrill/message_rejected/jsonschema/1-0-0", + "iglu:com.mandrill/message_sent/jsonschema/1-0-0", + "iglu:com.mandrill/message_soft_bounced/jsonschema/1-0-1", + "iglu:com.mandrill/recipient_unsubscribed/jsonschema/1-0-1", + ), + MarketoSchemas("iglu:com.marketo/event/jsonschema/2-0-0"), + OlarkSchemas( + "iglu:com.olark/transcript/jsonschema/1-0-0", + "iglu:com.olark/offline_message/jsonschema/1-0-0" + ), + PagerdutySchemas( + "iglu:com.pagerduty/incident/jsonschema/1-0-0" + ), + PingdomSchemas( + "iglu:com.pingdom/incident_assign/jsonschema/1-0-0", + "iglu:com.pingdom/incident_notify_user/jsonschema/1-0-0", + "iglu:com.pingdom/incident_notify_of_close/jsonschema/1-0-0" + ), + SendgridSchemas( + "iglu:com.sendgrid/processed/jsonschema/3-0-0", + "iglu:com.sendgrid/dropped/jsonschema/3-0-0", + "iglu:com.sendgrid/delivered/jsonschema/3-0-0", + "iglu:com.sendgrid/deferred/jsonschema/3-0-0", + "iglu:com.sendgrid/bounce/jsonschema/3-0-0", + "iglu:com.sendgrid/open/jsonschema/3-0-0", + "iglu:com.sendgrid/click/jsonschema/3-0-0", + "iglu:com.sendgrid/spamreport/jsonschema/3-0-0", + "iglu:com.sendgrid/unsubscribe/jsonschema/3-0-0", + "iglu:com.sendgrid/group_unsubscribe/jsonschema/3-0-0", + "iglu:com.sendgrid/group_resubscribe/jsonschema/3-0-0" + ), + StatusGatorSchemas( + "iglu:com.statusgator/status_change/jsonschema/1-0-0" + ), + UnbounceSchemas( + "iglu:com.unbounce/form_post/jsonschema/1-0-0" + ), + UrbanAirshipSchemas( + "iglu:com.urbanairship.connect/CLOSE/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/CUSTOM/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/FIRST_OPEN/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/IN_APP_MESSAGE_DISPLAY/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/IN_APP_MESSAGE_EXPIRATION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/IN_APP_MESSAGE_RESOLUTION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/LOCATION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/OPEN/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/PUSH_BODY/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/REGION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/RICH_DELETE/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/RICH_DELIVERY/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/RICH_HEAD/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/SEND/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/TAG_CHANGE/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/UNINSTALL/jsonschema/1-0-0" + ), + VeroSchemas( + "iglu:com.getvero/bounced/jsonschema/1-0-0", + "iglu:com.getvero/clicked/jsonschema/1-0-0", + "iglu:com.getvero/delivered/jsonschema/1-0-0", + "iglu:com.getvero/opened/jsonschema/1-0-0", + "iglu:com.getvero/sent/jsonschema/1-0-0", + "iglu:com.getvero/unsubscribed/jsonschema/1-0-0", + "iglu:com.getvero/created/jsonschema/1-0-0", + "iglu:com.getvero/updated/jsonschema/1-0-0" + ) + ) + +}