diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/core/AdminClientIT.scala b/core/src/it/scala/com/wixpress/dst/greyhound/core/AdminClientIT.scala index 95d2353a..dec5377c 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/core/AdminClientIT.scala +++ b/core/src/it/scala/com/wixpress/dst/greyhound/core/AdminClientIT.scala @@ -10,7 +10,7 @@ import com.wixpress.dst.greyhound.core.producer.ProducerRecord import com.wixpress.dst.greyhound.core.testkit.{BaseTestWithSharedEnv, TestMetrics} import com.wixpress.dst.greyhound.core.zioutils.CountDownLatch import com.wixpress.dst.greyhound.testenv.ITEnv -import com.wixpress.dst.greyhound.testenv.ITEnv.{Env, TestResources, testResources} +import com.wixpress.dst.greyhound.testenv.ITEnv.{testResources, Env, TestResources} import org.apache.kafka.common.config.TopicConfig.{DELETE_RETENTION_MS_CONFIG, MAX_MESSAGE_BYTES_CONFIG, RETENTION_MS_CONFIG} import org.apache.kafka.common.errors.InvalidTopicException import org.specs2.specification.core.Fragments @@ -83,7 +83,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] { } } - //todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged + // todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged // "reflect errors" in { // val topic1 = aTopicConfig() // val topic2 = aTopicConfig("x" * 250) @@ -104,7 +104,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] { // created === Map(badTopic.name -> None) // } // } - //todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged + // todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged // ================================================================================================================================= "ignore TopicExistsException by default" in { val topic = aTopicConfig() diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel b/core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel index 28d79c08..9735d210 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel +++ b/core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel @@ -24,13 +24,8 @@ specs2_ite2e_test( "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", "//core/src/test/resources", - #"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer", - #"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", - "@ch_qos_logback_logback_classic", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", - "@dev_zio_zio_test_2_12", - "@org_apache_kafka_kafka_2_12", "@org_apache_kafka_kafka_clients", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", ], diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/core/offset/BUILD.bazel b/core/src/it/scala/com/wixpress/dst/greyhound/core/offset/BUILD.bazel index ce9f7faf..a5c08ae5 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/core/offset/BUILD.bazel +++ b/core/src/it/scala/com/wixpress/dst/greyhound/core/offset/BUILD.bazel @@ -13,7 +13,6 @@ specs2_ite2e_test( "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_managed_2_12", "//core/src/it/resources", - "//core/src/it/scala/com/wixpress/dst/greyhound/core", "//core/src/it/scala/com/wixpress/dst/greyhound/testenv", "//core/src/it/scala/com/wixpress/dst/greyhound/testkit", "//core/src/main/scala/com/wixpress/dst/greyhound/core", @@ -23,9 +22,7 @@ specs2_ite2e_test( "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", - "//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", - "@ch_qos_logback_logback_classic", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@org_apache_kafka_kafka_clients", diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/core/rabalance/BUILD.bazel b/core/src/it/scala/com/wixpress/dst/greyhound/core/rabalance/BUILD.bazel index 3da73eba..c763308a 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/core/rabalance/BUILD.bazel +++ b/core/src/it/scala/com/wixpress/dst/greyhound/core/rabalance/BUILD.bazel @@ -13,7 +13,6 @@ specs2_ite2e_test( "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_managed_2_12", "//core/src/it/resources", - "//core/src/it/scala/com/wixpress/dst/greyhound/core", "//core/src/it/scala/com/wixpress/dst/greyhound/testenv", "//core/src/it/scala/com/wixpress/dst/greyhound/testkit", "//core/src/main/scala/com/wixpress/dst/greyhound/core", @@ -23,9 +22,7 @@ specs2_ite2e_test( "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", - "//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", - "@ch_qos_logback_logback_classic", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@org_apache_kafka_kafka_clients", diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/BUILD.bazel b/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/BUILD.bazel index 3f83ff43..cb001169 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/BUILD.bazel +++ b/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/BUILD.bazel @@ -12,7 +12,6 @@ specs2_ite2e_test( deps = [ "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_managed_2_12", - "//core/src/it/scala/com/wixpress/dst/greyhound/core", "//core/src/it/scala/com/wixpress/dst/greyhound/testenv", "//core/src/it/scala/com/wixpress/dst/greyhound/testkit", "//core/src/main/scala/com/wixpress/dst/greyhound/core", @@ -21,11 +20,8 @@ specs2_ite2e_test( "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry", "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", - "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", "//core/src/test/resources", - "//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", - "@ch_qos_logback_logback_classic", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@org_apache_kafka_kafka_clients", diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/testenv/BUILD.bazel b/core/src/it/scala/com/wixpress/dst/greyhound/testenv/BUILD.bazel index 35ab06f9..6b5f15d9 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/testenv/BUILD.bazel +++ b/core/src/it/scala/com/wixpress/dst/greyhound/testenv/BUILD.bazel @@ -13,15 +13,11 @@ scala_library( "@dev_zio_zio_managed_2_12", "//core/src/it/scala/com/wixpress/dst/greyhound/testkit", "//core/src/main/scala/com/wixpress/dst/greyhound/core", - "//core/src/main/scala/com/wixpress/dst/greyhound/core/admin", "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@dev_zio_zio_test_2_12", - "@org_apache_curator_curator_test", - "@org_apache_kafka_kafka_2_12", - "@org_apache_kafka_kafka_clients", ], ) diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/testkit/BUILD.bazel b/core/src/it/scala/com/wixpress/dst/greyhound/testkit/BUILD.bazel index fb990677..f3345b95 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/testkit/BUILD.bazel +++ b/core/src/it/scala/com/wixpress/dst/greyhound/testkit/BUILD.bazel @@ -16,7 +16,6 @@ scala_library( "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", # "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "@dev_zio_zio_2_12", - "@dev_zio_zio_test_2_12", "@org_apache_curator_curator_test", "@org_apache_kafka_kafka_2_12", "@org_apache_kafka_kafka_clients", diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala index 3a53c551..7592362e 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala @@ -382,7 +382,10 @@ object UnsafeOffsetOperations { } override def offsetsForTimes(partitions: Set[TopicPartition], timeEpoch: Long, timeout: Duration): Map[TopicPartition, Option[Long]] = - consumer.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout) - .asScala.toMap.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) } + consumer + .offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout) + .asScala + .toMap + .map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) } } } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala index 6901b5c9..5985f80d 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala @@ -56,14 +56,14 @@ object EventLoop { partitionsAssigned <- Promise.make[Nothing, Unit] // TODO how to handle errors in subscribe? rebalanceListener = listener(pausedPartitionsRef, config, dispatcher, partitionsAssigned, group, consumer, clientId, offsets) - _ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes)) + _ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes)) _ <- subscribe(initialSubscription, rebalanceListener)(consumer) running <- Ref.make[EventLoopState](Running) - _ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes)) + _ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes)) fiber <- pollOnce(running, consumer, dispatcher, pausedPartitionsRef, positionsRef, offsets, config, clientId, group) .repeatWhile(_ == true) .forkDaemon - _ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes)) + _ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes)) _ <- partitionsAssigned.await env <- ZIO.environment[Env] } yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env)) @@ -303,9 +303,11 @@ object EventLoopMetric { case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric - case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean) extends EventLoopMetric + case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean) + extends EventLoopMetric - case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric + case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String]) + extends EventLoopMetric case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala index 97f1b341..e684eef7 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala @@ -40,8 +40,10 @@ class OffsetsInitializer( offsetOperations.pause(toPause) val rewindUncommittedOffsets = if (offsetResetIsEarliest || notCommitted.isEmpty || rewindUncommittedOffsetsBy.isZero) Map.empty - else offsetOperations.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout) - .map{case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L))} + else + offsetOperations + .offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout) + .map { case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L)) } val positions = notCommitted.map(tp => tp -> offsetOperations.position(tp, effectiveTimeout)).toMap ++ toOffsets ++ rewindUncommittedOffsets diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala index 96a929f8..d0f93a00 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala @@ -41,10 +41,14 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume implicit trace: Trace ): UIO[DelayedRebalanceEffect] = (report(PartitionsRevoked(clientId, group, partitions, config.consumerAttributes)) *> - rebalanceListener.onPartitionsRevoked(consumer, partitions) - .timed.tap { case (duration, _) => report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis)) } - .map(_._2) - ).provideEnvironment(r) + rebalanceListener + .onPartitionsRevoked(consumer, partitions) + .timed + .tap { + case (duration, _) => + report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis)) + } + .map(_._2)).provideEnvironment(r) override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = (report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *> @@ -241,11 +245,13 @@ object ConsumerMetric { attributes: Map[String, String] = Map.empty ) extends ConsumerMetric - case class PartitionsRevokedComplete(clientId: ClientId, - group: Group, - partitions: Set[TopicPartition], - attributes: Map[String, String] = Map.empty, - durationMs: Long) extends ConsumerMetric + case class PartitionsRevokedComplete( + clientId: ClientId, + group: Group, + partitions: Set[TopicPartition], + attributes: Map[String, String] = Map.empty, + durationMs: Long + ) extends ConsumerMetric case class SubscribeFailed(clientId: ClientId, group: Group, error: Throwable, attributes: Map[String, String] = Map.empty) extends ConsumerMetric diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala index c8dd3edd..a73b7dce 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala @@ -1,18 +1,18 @@ package com.wixpress.dst.greyhound.core.consumer.retry -import java.time.{Instant, Duration => JavaDuration} +import java.time.{Duration => JavaDuration, Instant} import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.regex.Pattern import com.wixpress.dst.greyhound.core.Serdes.StringSerde import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics} -import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{RetryAttemptNumber, currentTime} +import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{currentTime, RetryAttemptNumber} import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith} import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription} import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.WaitingForRetry import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report import com.wixpress.dst.greyhound.core.producer.ProducerRecord -import com.wixpress.dst.greyhound.core.{Group, Headers, Topic, durationDeserializer, instantDeserializer} +import com.wixpress.dst.greyhound.core.{durationDeserializer, instantDeserializer, Group, Headers, Topic} import zio.Clock import zio.Duration import zio.Schedule.spaced @@ -24,14 +24,14 @@ trait NonBlockingRetryHelper { def retryTopicsFor(originalTopic: Topic): Set[Topic] def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)( - implicit trace: Trace + implicit trace: Trace ): UIO[Option[RetryAttempt]] def retryDecision[E]( - retryAttempt: Option[RetryAttempt], - record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], - error: E, - subscription: ConsumerSubscription + retryAttempt: Option[RetryAttempt], + record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], + error: E, + subscription: ConsumerSubscription )(implicit trace: Trace): URIO[Any, RetryDecision] def retrySteps = retryTopicsFor("").size @@ -47,46 +47,44 @@ object NonBlockingRetryHelper { .getOrElse(NonBlockingBackoffPolicy.empty) override def retryTopicsFor(topic: Topic): Set[Topic] = - policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) => - acc + s"$topic-$group-retry-$attempt" - ) + policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) => acc + s"$topic-$group-retry-$attempt") override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)( - implicit trace: Trace + implicit trace: Trace ): UIO[Option[RetryAttempt]] = { (for { submitted <- headers.get(RetryHeader.Submitted, instantDeserializer) backoff <- headers.get(RetryHeader.Backoff, durationDeserializer) originalTopic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde) } yield for { - ta <- topicAttempt(subscription, topic, originalTopic) + ta <- topicAttempt(subscription, topic, originalTopic) TopicAttempt(originalTopic, attempt) = ta - s <- submitted - b <- backoff + s <- submitted + b <- backoff } yield RetryAttempt(originalTopic, attempt, s, b)) .catchAll(_ => ZIO.none) } private def topicAttempt( - subscription: ConsumerSubscription, - topic: Topic, - originalTopicHeader: Option[String] + subscription: ConsumerSubscription, + topic: Topic, + originalTopicHeader: Option[String] ) = subscription match { - case _: Topics => extractTopicAttempt(group, topic) + case _: Topics => extractTopicAttempt(group, topic) case _: TopicPattern => extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader) } override def retryDecision[E]( - retryAttempt: Option[RetryAttempt], - record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], - error: E, - subscription: ConsumerSubscription + retryAttempt: Option[RetryAttempt], + record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], + error: E, + subscription: ConsumerSubscription )(implicit trace: Trace): URIO[Any, RetryDecision] = currentTime.map(now => { val nextRetryAttempt = retryAttempt.fold(0)(_.attempt + 1) val originalTopic = retryAttempt.fold(record.topic)(_.originalTopic) - val retryTopic = subscription match { + val retryTopic = subscription match { case _: TopicPattern => patternRetryTopic(group, nextRetryAttempt) case _: Topics => fixedRetryTopic(originalTopic, group, nextRetryAttempt) } @@ -123,19 +121,19 @@ object NonBlockingRetryHelper { inputTopic.split(s"-$group-retry-").toSeq match { case Seq(topic, attempt) if Try(attempt.toInt).isSuccess => Some(TopicAttempt(topic, attempt.toInt)) - case _ => None + case _ => None } private def extractTopicAttemptFromPatternRetryTopic[E]( - group: Group, - inputTopic: Topic, - originalTopicHeader: Option[String] + group: Group, + inputTopic: Topic, + originalTopicHeader: Option[String] ) = { originalTopicHeader.flatMap(originalTopic => { inputTopic.split(s"__gh_pattern-retry-$group-attempt-").toSeq match { case Seq(_, attempt) if Try(attempt.toInt).isSuccess => Some(TopicAttempt(originalTopic, attempt.toInt)) - case _ => None + case _ => None } }) } @@ -176,14 +174,14 @@ object RetryHeader { } case class RetryAttempt( - originalTopic: Topic, - attempt: RetryAttemptNumber, - submittedAt: Instant, - backoff: Duration + originalTopic: Topic, + attempt: RetryAttemptNumber, + submittedAt: Instant, + backoff: Duration ) { def sleep(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] = - (RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds)) + RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds) private def reportWaitingInIntervals(every: Duration) = report(WaitingForRetry(originalTopic, attempt, submittedAt.toEpochMilli, backoff.toMillis)) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala index b6e410d3..a15f1e5b 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala @@ -49,14 +49,17 @@ private[retry] object NonBlockingRetryRecordHandler { } private def delayRetry(record: ConsumerRecord[_, _], awaitShutdown: TopicPartition => UIO[AwaitShutdown])( - retryAttempt: RetryAttempt) = + retryAttempt: RetryAttempt + ) = zio.Random.nextInt.flatMap(correlationId => report( WaitingBeforeRetry(record.topic, retryAttempt, record.partition, record.offset, correlationId) ) *> awaitShutdown(record.topicPartition) .flatMap(_.interruptOnShutdown(retryAttempt.sleep)) - .reporting(r => DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId)) + .reporting(r => + DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId) + ) ) override def isHandlingRetryTopicMessage(group: Group, record: ConsumerRecord[K, V]): Boolean = { diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandlerMetric.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandlerMetric.scala index 980a71f7..b1580b16 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandlerMetric.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandlerMetric.scala @@ -20,7 +20,8 @@ object RetryRecordHandlerMetric { case class NoRetryOnNonRetryableFailure(partition: TopicPartition, offset: Long, cause: Exception) extends RetryRecordHandlerMetric case object Silent extends RetryRecordHandlerMetric - case class WaitingBeforeRetry(retryTopic: Topic, retryAttempt: RetryAttempt, partition: Int, offset:Long, correlationId: Int) extends RetryRecordHandlerMetric + case class WaitingBeforeRetry(retryTopic: Topic, retryAttempt: RetryAttempt, partition: Int, offset: Long, correlationId: Int) + extends RetryRecordHandlerMetric case class DoneWaitingBeforeRetry( retryTopic: Topic, diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel index 90ed4056..3e104e18 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel @@ -11,17 +11,14 @@ specs2_unit_test( deps = [ "@dev_zio_zio_managed_2_12", "@dev_zio_zio_stacktracer_2_12", - "//core/src/it/scala/com/wixpress/dst/greyhound/testkit", "//core/src/main/scala/com/wixpress/dst/greyhound/core", "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer", "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/domain", "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry", "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", - "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", "//core/src/test/resources", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", - "@ch_qos_logback_logback_classic", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@dev_zio_zio_streams_2_12", diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala index d8c0c28e..5e708452 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala @@ -16,7 +16,7 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito { private val Seq(p1, p2, p3) = Seq("t1" -> 1, "t2" -> 2, "t3" -> 3).map(tp => TopicPartition(tp._1, tp._2)) private val partitions = Set(p1, p2, p3) private val p1Pos, p2Pos, p3Pos = randomInt.toLong - val epochTimeToRewind = 1000L + val epochTimeToRewind = 1000L "do nothing if no missing offsets" in new ctx { @@ -124,71 +124,73 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito { reported must contain(CommittedMissingOffsetsFailed(clientId, group, partitions, Map.empty, elapsed = Duration.ZERO, e)) } - "rewind uncommitted offsets" in new ctx { - givenCommittedOffsets(partitions)(Map(p2 -> randomInt)) - givenPositions(p2 -> p2Pos, p3 -> p3Pos) - givenOffsetsForTimes(epochTimeToRewind, p1 -> 0L, p2 -> 1L) - - committer.initializeOffsets(partitions) + "rewind uncommitted offsets" in + new ctx { + givenCommittedOffsets(partitions)(Map(p2 -> randomInt)) + givenPositions(p2 -> p2Pos, p3 -> p3Pos) + givenOffsetsForTimes(epochTimeToRewind, p1 -> 0L, p2 -> 1L) - val missingOffsets = Map( - p1 -> p1Pos, - p3 -> p3Pos - ) + committer.initializeOffsets(partitions) - val rewindedOffsets = Map( - p1 -> 0L, - ) + val missingOffsets = Map( + p1 -> p1Pos, + p3 -> p3Pos + ) - there was - one(offsetOps).commit( - missingOffsets ++ rewindedOffsets, - timeout + val rewindedOffsets = Map( + p1 -> 0L ) - } - "rewind to endOffsets for uncommitted partitions when offsetsForTimes return null offsets " in new ctx { - givenCommittedOffsets(partitions)(Map(p2 -> randomInt, p3 -> randomInt)) - givenPositions(p3 -> p3Pos) - givenEndOffsets(partitions, timeout)(Map(p1 -> p1Pos)) - givenOffsetsForTimes(Set(p1))(p1 -> None /*kafka SDK returned null*/) + there was + one(offsetOps).commit( + missingOffsets ++ rewindedOffsets, + timeout + ) + } - committer.initializeOffsets(partitions) + "rewind to endOffsets for uncommitted partitions when offsetsForTimes return null offsets " in + new ctx { + givenCommittedOffsets(partitions)(Map(p2 -> randomInt, p3 -> randomInt)) + givenPositions(p3 -> p3Pos) + givenEndOffsets(partitions, timeout)(Map(p1 -> p1Pos)) + givenOffsetsForTimes(Set(p1))(p1 -> None /*kafka SDK returned null*/ ) - val committedOffsets = Map( - p1 -> p1Pos, - ) + committer.initializeOffsets(partitions) - there was - one(offsetOps).commit( - committedOffsets, - timeout + val committedOffsets = Map( + p1 -> p1Pos ) - } - "not rewind uncommitted offsets when offset reset is earliest" in new ctx(offsetReset = OffsetReset.Earliest) { - givenCommittedOffsets(partitions)(Map(p2 -> randomInt)) - givenPositions(p2 -> p2Pos, p3 -> p3Pos) - givenOffsetsForTimes(epochTimeToRewind, p1 -> 0L, p2 -> 1L) + there was + one(offsetOps).commit( + committedOffsets, + timeout + ) + } - committer.initializeOffsets(partitions) + "not rewind uncommitted offsets when offset reset is earliest" in + new ctx(offsetReset = OffsetReset.Earliest) { + givenCommittedOffsets(partitions)(Map(p2 -> randomInt)) + givenPositions(p2 -> p2Pos, p3 -> p3Pos) + givenOffsetsForTimes(epochTimeToRewind, p1 -> 0L, p2 -> 1L) - val missingOffsets = Map( - p1 -> p1Pos, - p3 -> p3Pos - ) + committer.initializeOffsets(partitions) - val rewindedOffsets = Map( - p1 -> 0L, - ) + val missingOffsets = Map( + p1 -> p1Pos, + p3 -> p3Pos + ) - there was - one(offsetOps).commit( - missingOffsets ++ rewindedOffsets, - timeout + val rewindedOffsets = Map( + p1 -> 0L ) - } + there was + one(offsetOps).commit( + missingOffsets ++ rewindedOffsets, + timeout + ) + } class ctx(val seekTo: Map[TopicPartition, SeekTo] = Map.empty, offsetReset: OffsetReset = OffsetReset.Latest) extends Scope { private val metricsLogRef = new AtomicReference(Seq.empty[GreyhoundMetric]) @@ -256,4 +258,4 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito { private def randomStr = Random.alphanumeric.take(5).mkString private def randomInt = Random.nextInt(200) private def randomPartition = TopicPartition(randomStr, randomInt) -} \ No newline at end of file +} diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/BUILD.bazel b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/BUILD.bazel index b02990a0..dea7a061 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/BUILD.bazel +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/BUILD.bazel @@ -17,7 +17,6 @@ specs2_unit_test( "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", - "//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/producer/BUILD.bazel b/core/src/test/scala/com/wixpress/dst/greyhound/core/producer/BUILD.bazel index 27f12381..46310b8b 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/producer/BUILD.bazel +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/producer/BUILD.bazel @@ -16,7 +16,6 @@ specs2_unit_test( "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", "//core/src/test/resources", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", - "@ch_qos_logback_logback_classic", # "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@dev_zio_zio_test_2_12", diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/BUILD.bazel b/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/BUILD.bazel index 17c68cef..75f65d20 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/BUILD.bazel +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/BUILD.bazel @@ -15,7 +15,6 @@ scala_library( "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry", "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer", - "//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils", # "@dev_zio_izumi_reflect_2_12", # "@dev_zio_izumi_reflect_thirdparty_boopickle_shaded_2_12", "@dev_zio_zio_2_12", diff --git a/future-interop/src/it/scala/com/wixpress/dst/greyhound/future/BUILD.bazel b/future-interop/src/it/scala/com/wixpress/dst/greyhound/future/BUILD.bazel index 45e8069e..af2ef67c 100644 --- a/future-interop/src/it/scala/com/wixpress/dst/greyhound/future/BUILD.bazel +++ b/future-interop/src/it/scala/com/wixpress/dst/greyhound/future/BUILD.bazel @@ -21,11 +21,8 @@ specs2_ite2e_test( "//core/src/test/resources", "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", "//future-interop/src/main/scala/com/wixpress/dst/greyhound/future", - "//java-interop/src/main/java/com/wixpress/dst/greyhound/java", - "@ch_qos_logback_logback_classic", "@dev_zio_izumi_reflect_2_12", "@dev_zio_zio_2_12", "@dev_zio_zio_stacktracer_2_12", - "@org_apache_kafka_kafka_clients", ], )