-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split out channel resource management into a generic Coldswap #15
Conversation
* acquires the next resource before closing the previous one. Whereas this Coldswap is "cold" | ||
* because it always closes any previous Resources before acquiring the next one. | ||
*/ | ||
trait Coldswap[F[_], A] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class can be moved out into common-streams. We can re-use it for the BigQuery loader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love Coldswap
, looks like it perfectly fits our use case 👏
...les/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala
Outdated
Show resolved
Hide resolved
modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Coldswap.scala
Outdated
Show resolved
Hide resolved
} yield response | ||
} yield { | ||
val make = actionRef.update(_ :+ OpenedChannel).as { | ||
new ChannelProvider[IO] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now after introducing coldswap I started to think...is it really ChannelProvider
anymore? Maybe we could have slightly different interfaces now, sth like:
trait Channel[F[_]] {
def write(rows: Iterable[Map[String, AnyRef]]): F[Channel.WriteResult]
def close(): F[Unit]
}
trait ChannelProvider[F[_]] {
def open(): F[Channel[F]]
}
and
//or `RotatingChannel`??
type StatefulChannel[F[_]] = Coldswap[F, Channel[F]]
which would be a part of Environment
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is not a provider any more, and I agree Channel
is now a better name, since the recent changes.
But I don't see a reason to change the interface to add open
or close
methods. Maybe I'm missing something?
I will push a commit that changes the name to Channel
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea with open/close was to isolate as much of snowflake sdk specific stuff and test as much as possible of our code.
- sdk
SnowflakeStreamingIngestClient
.openChannel ~> ourChannelProvider
.open() - sdk
SnowflakeStreamingIngestChannel
.close() ~> ourChannel
.close()
So then you could even test this part of making channel resource, which doesn't depend on any sdk specifics:
def makeResource[F[_]: Async](
channelName: String,
provider: ChannelProvider[F],
snowflakeHealth: SnowflakeHealth[F],
retriesConfig: Config.Retries,
monitoring: Monitoring[F]
): Resource[F, Channel[F]] = {
def make(poll: Poll[F]) = poll {
Logger[F].info(s"Opening channel $channelName") *>
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
provider.open()
.onError { cause =>
monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause))
}
}
}
Resource.makeFull(make) { channel =>
Logger[F].info(s"Closing channel $channelName") *>
channel.close()
}
}
but then we also add health + monitoring to the mix. We can test some failure scenarios as well! (e.g. mock open channel failures) But I'm not sure we want to go that deep, so I'm very much OK with leaving it at as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made an attempt at this in #18
dff1a9f
to
f40ef81
Compare
This PR is best appreciated by first looking at the
Action
s in theProcessingSpec
. And then look at howChannelProvider
is much simpler than it was before.Oh by the way, this is completely untested.