Skip to content

Commit

Permalink
Amendment 1: Fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 3, 2024
1 parent bec1905 commit 651afda
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import fs2.Stream

import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager}
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, SnowflakeHealth, TableManager}
import com.snowplowanalytics.snowplow.runtime.AppInfo

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand All @@ -32,9 +32,20 @@ object MockEnvironment {
case object ClosedChannel extends Action
case object OpenedChannel extends Action
case class WroteRowsToSnowflake(rowCount: Int) extends Action

/** Metrics * */
case class AddedGoodCountMetric(count: Int) extends Action
case class AddedBadCountMetric(count: Int) extends Action
case class SetLatencyMetric(millis: Long) extends Action

/** Alerts * */
case object SentFailedToCreateEventsTableAlert extends Action
case object SentFailedToAddColumnsAlert extends Action
case object SentFailedToOpenSnowflakeChannelAlert extends Action

/** Health probe * */
case object BecomeHealthy extends Action
case object BecomeUnhealthy extends Action
}
import Action._

Expand All @@ -51,16 +62,21 @@ object MockEnvironment {
def build(inputs: List[TokenedEvents], channelResponses: List[Channel.WriteResult] = Nil): IO[MockEnvironment] =
for {
state <- Ref[IO].of(Vector.empty[Action])
channel <- testChannel(state, channelResponses)
channelProvider <- testChannelProvider(state, channelResponses)
} yield {
val env = Environment(
appInfo = appInfo,
source = testSourceAndAck(inputs, state),
badSink = testSink(state),
httpClient = testHttpClient,
tableManager = testTableManager(state),
channel = channel,
metrics = testMetrics(state),
appInfo = appInfo,
source = testSourceAndAck(inputs, state),
badSink = testSink(state),
httpClient = testHttpClient,
tableManager = testTableManager(state),
channelProvider = channelProvider,
metrics = testMetrics(state),
monitoring = testMonitoring(state),
snowflakeHealth = testSnowflakeHealth(state),
retries = Config.Retries(
backoff = 30.seconds
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 10.seconds,
Expand Down Expand Up @@ -112,40 +128,43 @@ object MockEnvironment {
}

/**
* Mocked implementation of a `Channel`
* Mocked implementation of a `Channel.Provider`
*
* @param actionRef
* Global Ref used to accumulate actions that happened
* @param responses
* Responses that this mocked Channel should return each time someone calls `write`. If no
* responses given, then it will return with a successful response.
*/
private def testChannel(
private def testChannelProvider(
actionRef: Ref[IO, Vector[Action]],
responses: List[Channel.WriteResult]
): IO[Resource[IO, Channel[IO]]] =
): IO[Channel.Provider[IO]] =
for {
responseRef <- Ref[IO].of(responses)
} yield {
val make = actionRef.update(_ :+ OpenedChannel).as {
new Channel[IO] {
def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] =
for {
response <- responseRef.modify {
case head :: tail => (tail, head)
case Nil => (Nil, Channel.WriteResult.WriteFailures(Nil))
}
_ <- response match {
case Channel.WriteResult.WriteFailures(failures) =>
actionRef.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size))
case Channel.WriteResult.ChannelIsInvalid =>
IO.unit
}
} yield response
}
}

Resource.make(make)(_ => actionRef.update(_ :+ ClosedChannel))
} yield new Channel.Provider[IO] {
def open: IO[Channel[IO]] =
actionRef.update(_ :+ OpenedChannel).as(testChannel(actionRef, responseRef))
}

private def testChannel(actionRef: Ref[IO, Vector[Action]], responseRef: Ref[IO, List[Channel.WriteResult]]): Channel[IO] =
new Channel[IO] {
def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] =
for {
response <- responseRef.modify {
case head :: tail => (tail, head)
case Nil => (Nil, Channel.WriteResult.WriteFailures(Nil))
}
_ <- response match {
case Channel.WriteResult.WriteFailures(failures) =>
actionRef.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size))
case Channel.WriteResult.ChannelIsInvalid =>
IO.unit
}
} yield response

def close: IO[Unit] =
actionRef.update(_ :+ ClosedChannel)
}

def testMetrics(ref: Ref[IO, Vector[Action]]): Metrics[IO] = new Metrics[IO] {
Expand All @@ -160,4 +179,19 @@ object MockEnvironment {

def report: Stream[IO, Nothing] = Stream.never[IO]
}

def testMonitoring(ref: Ref[IO, Vector[Action]]): Monitoring[IO] = new Monitoring[IO] {
def alert(message: Alert): IO[Unit] = message match {
case Alert.FailedToCreateEventsTable(_) => ref.update(_ :+ SentFailedToCreateEventsTableAlert)
case Alert.FailedToAddColumns(_, _) => ref.update(_ :+ SentFailedToAddColumnsAlert)
case Alert.FailedToOpenSnowflakeChannel(_) => ref.update(_ :+ SentFailedToOpenSnowflakeChannelAlert)
}
}

def testSnowflakeHealth(ref: Ref[IO, Vector[Action]]): SnowflakeHealth[IO] = new SnowflakeHealth[IO] {
def setHealthy(): IO[Unit] =
ref.update(_ :+ BecomeHealthy)
def setUnhealthy(): IO[Unit] =
ref.update(_ :+ BecomeUnhealthy)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(4),
Action.AddedGoodCountMetric(4),
Action.AddedBadCountMetric(0),
Expand Down Expand Up @@ -87,6 +88,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(6),
Action.SentToBad(6),
Action.AddedGoodCountMetric(6),
Expand Down Expand Up @@ -115,10 +117,12 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(1),
Action.ClosedChannel,
Action.AlterTableAddedColumns(List("unstruct_event_xyz_1", "contexts_abc_2")),
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(1),
Action.AddedGoodCountMetric(2),
Action.AddedBadCountMetric(0),
Expand Down Expand Up @@ -147,6 +151,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(1),
Action.SentToBad(1),
Action.AddedGoodCountMetric(1),
Expand Down Expand Up @@ -176,6 +181,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(1),
Action.ClosedChannel
)
Expand All @@ -197,8 +203,10 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.ClosedChannel,
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(2),
Action.AddedGoodCountMetric(2),
Action.AddedBadCountMetric(0),
Expand Down Expand Up @@ -228,6 +236,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Action.SetLatencyMetric(42123),
Action.SetLatencyMetric(42123),
Action.OpenedChannel,
Action.BecomeHealthy,
Action.WroteRowsToSnowflake(4),
Action.AddedGoodCountMetric(4),
Action.AddedBadCountMetric(0),
Expand Down

0 comments on commit 651afda

Please sign in to comment.