Skip to content

Commit

Permalink
Acquiring channel should not block graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 3, 2024
1 parent f20a8b8 commit a90ab76
Showing 1 changed file with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing
import cats.effect.implicits._
import cats.effect.kernel.{Ref, Resource}
import cats.effect.std.{Hotswap, Semaphore}
import cats.effect.{Async, Sync}
import cats.effect.{Async, Poll, Sync}
import cats.implicits._
import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, Monitoring}
import net.snowflake.ingest.streaming.internal.SnowsFlakePlowInterop
Expand Down Expand Up @@ -201,16 +201,18 @@ object ChannelProvider {
.setDefaultTimezone(ZoneOffset.UTC)
.build

val make = Logger[F].info(s"Opening channel ${config.channel}") *>
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
Async[F]
.blocking(client.openChannel(request))
.onError { cause =>
monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause))
}
}
def make(poll: Poll[F]) = poll {
Logger[F].info(s"Opening channel ${config.channel}") *>
SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) {
Async[F]
.blocking(client.openChannel(request))
.onError { cause =>
monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause))
}
}
}

Resource.make(make) { channel =>
Resource.makeFull(make) { channel =>
Logger[F].info(s"Closing channel ${config.channel}") *>
Async[F]
.fromCompletableFuture {
Expand Down

0 comments on commit a90ab76

Please sign in to comment.