diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala index f7b5a6a..0d2c59d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.implicits._ import cats.effect.kernel.{Ref, Resource} import cats.effect.std.{Hotswap, Semaphore} -import cats.effect.{Async, Sync} +import cats.effect.{Async, Poll, Sync} import cats.implicits._ import com.snowplowanalytics.snowplow.snowflake.{Alert, Config, Monitoring} import net.snowflake.ingest.streaming.internal.SnowsFlakePlowInterop @@ -201,16 +201,18 @@ object ChannelProvider { .setDefaultTimezone(ZoneOffset.UTC) .build - val make = Logger[F].info(s"Opening channel ${config.channel}") *> - SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { - Async[F] - .blocking(client.openChannel(request)) - .onError { cause => - monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause)) - } - } + def make(poll: Poll[F]) = poll { + Logger[F].info(s"Opening channel ${config.channel}") *> + SnowflakeRetrying.retryIndefinitely(snowflakeHealth, retriesConfig) { + Async[F] + .blocking(client.openChannel(request)) + .onError { cause => + monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause)) + } + } + } - Resource.make(make) { channel => + Resource.makeFull(make) { channel => Logger[F].info(s"Closing channel ${config.channel}") *> Async[F] .fromCompletableFuture {