Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out channel resource management into a generic Coldswap #15

Merged
merged 1 commit into from
Jan 4, 2024

Conversation

istreeter
Copy link
Collaborator

@istreeter istreeter commented Dec 24, 2023

This PR is best appreciated by first looking at the Actions in the ProcessingSpec. And then look at how ChannelProvider is much simpler than it was before.

Oh by the way, this is completely untested.

* acquires the next resource before closing the previous one. Whereas this Coldswap is "cold"
* because it always closes any previous Resources before acquiring the next one.
*/
trait Coldswap[F[_], A] {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class can be moved out into common-streams. We can re-use it for the BigQuery loader.

@istreeter istreeter requested a review from pondzix December 24, 2023 21:23
Copy link
Contributor

@pondzix pondzix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love Coldswap, looks like it perfectly fits our use case 👏

} yield response
} yield {
val make = actionRef.update(_ :+ OpenedChannel).as {
new ChannelProvider[IO] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now after introducing coldswap I started to think...is it really ChannelProvider anymore? Maybe we could have slightly different interfaces now, sth like:

trait Channel[F[_]] {

  def write(rows: Iterable[Map[String, AnyRef]]): F[Channel.WriteResult]
  def close(): F[Unit]
}
trait ChannelProvider[F[_]] {

  def open(): F[Channel[F]]
}

and

 //or `RotatingChannel`??
  type StatefulChannel[F[_]] = Coldswap[F, Channel[F]]

which would be a part of Environment. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is not a provider any more, and I agree Channel is now a better name, since the recent changes.

But I don't see a reason to change the interface to add open or close methods. Maybe I'm missing something?

I will push a commit that changes the name to Channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea with open/close was to isolate as much of snowflake sdk specific stuff and test as much as possible of our code.

  • sdk SnowflakeStreamingIngestClient.openChannel ~> our ChannelProvider.open()
  • sdk SnowflakeStreamingIngestChannel.close() ~> our Channel.close()

So then you could even test this part of making channel resource, which doesn't depend on any sdk specifics:

  def makeResource[F[_]: Async](
    channelName: String,
    provider: ChannelProvider[F],
    snowflakeHealth: SnowflakeHealth[F],
    retriesConfig: Config.Retries,
    monitoring: Monitoring[F]
  ): Resource[F, Channel[F]] = {

    def make(poll: Poll[F]) = poll {
      Logger[F].info(s"Opening channel $channelName") *>
        SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
          provider.open()
            .onError { cause =>
              monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause))
            }
        }
    }

    Resource.makeFull(make) { channel =>
      Logger[F].info(s"Closing channel $channelName") *>
        channel.close()
    }
  }

but then we also add health + monitoring to the mix. We can test some failure scenarios as well! (e.g. mock open channel failures) But I'm not sure we want to go that deep, so I'm very much OK with leaving it at as it is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made an attempt at this in #18

@istreeter istreeter merged commit f40ef81 into develop Jan 4, 2024
2 checks passed
@istreeter istreeter deleted the feature/coldswap branch January 4, 2024 10:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants