Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSS Sync | Mon Apr 03 12:24:14 UTC 2023 #550

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.testkit.{BaseTestWithSharedEnv, TestMetrics}
import com.wixpress.dst.greyhound.core.zioutils.CountDownLatch
import com.wixpress.dst.greyhound.testenv.ITEnv
import com.wixpress.dst.greyhound.testenv.ITEnv.{Env, TestResources, testResources}
import com.wixpress.dst.greyhound.testenv.ITEnv.{testResources, Env, TestResources}
import org.apache.kafka.common.config.TopicConfig.{DELETE_RETENTION_MS_CONFIG, MAX_MESSAGE_BYTES_CONFIG, RETENTION_MS_CONFIG}
import org.apache.kafka.common.errors.InvalidTopicException
import org.specs2.specification.core.Fragments
Expand Down Expand Up @@ -83,7 +83,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// "reflect errors" in {
// val topic1 = aTopicConfig()
// val topic2 = aTopicConfig("x" * 250)
Expand All @@ -104,7 +104,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
// created === Map(badTopic.name -> None)
// }
// }
//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// =================================================================================================================================
"ignore TopicExistsException by default" in {
val topic = aTopicConfig()
Expand Down
5 changes: 0 additions & 5 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/resources",
#"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
#"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ specs2_ite2e_test(
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -23,9 +22,7 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ specs2_ite2e_test(
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -23,9 +22,7 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ specs2_ite2e_test(
deps = [
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -21,11 +20,8 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/resources",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ scala_library(
"@dev_zio_zio_managed_2_12",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/admin",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_curator_curator_test",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ scala_library(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
# "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_curator_curator_test",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ object UnsafeOffsetOperations {
}

override def offsetsForTimes(partitions: Set[TopicPartition], timeEpoch: Long, timeout: Duration): Map[TopicPartition, Option[Long]] =
consumer.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout)
.asScala.toMap.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) }
consumer
.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout)
.asScala
.toMap
.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ object EventLoop {
partitionsAssigned <- Promise.make[Nothing, Unit]
// TODO how to handle errors in subscribe?
rebalanceListener = listener(pausedPartitionsRef, config, dispatcher, partitionsAssigned, group, consumer, clientId, offsets)
_ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes))
_ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes))
_ <- subscribe(initialSubscription, rebalanceListener)(consumer)
running <- Ref.make[EventLoopState](Running)
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
fiber <- pollOnce(running, consumer, dispatcher, pausedPartitionsRef, positionsRef, offsets, config, clientId, group)
.repeatWhile(_ == true)
.forkDaemon
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- partitionsAssigned.await
env <- ZIO.environment[Env]
} yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env))
Expand Down Expand Up @@ -303,9 +303,11 @@ object EventLoopMetric {

case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean) extends EventLoopMetric
case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean)
extends EventLoopMetric

case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric
case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String])
extends EventLoopMetric

case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class OffsetsInitializer(
offsetOperations.pause(toPause)
val rewindUncommittedOffsets =
if (offsetResetIsEarliest || notCommitted.isEmpty || rewindUncommittedOffsetsBy.isZero) Map.empty
else offsetOperations.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout)
.map{case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L))}
else
offsetOperations
.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout)
.map { case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L)) }

val positions =
notCommitted.map(tp => tp -> offsetOperations.position(tp, effectiveTimeout)).toMap ++ toOffsets ++ rewindUncommittedOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
(report(PartitionsRevoked(clientId, group, partitions, config.consumerAttributes)) *>
rebalanceListener.onPartitionsRevoked(consumer, partitions)
.timed.tap { case (duration, _) => report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis)) }
.map(_._2)
).provideEnvironment(r)
rebalanceListener
.onPartitionsRevoked(consumer, partitions)
.timed
.tap {
case (duration, _) =>
report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis))
}
.map(_._2)).provideEnvironment(r)

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
(report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *>
Expand Down Expand Up @@ -241,11 +245,13 @@ object ConsumerMetric {
attributes: Map[String, String] = Map.empty
) extends ConsumerMetric

case class PartitionsRevokedComplete(clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty,
durationMs: Long) extends ConsumerMetric
case class PartitionsRevokedComplete(
clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty,
durationMs: Long
) extends ConsumerMetric

case class SubscribeFailed(clientId: ClientId, group: Group, error: Throwable, attributes: Map[String, String] = Map.empty)
extends ConsumerMetric
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.wixpress.dst.greyhound.core.consumer.retry

import java.time.{Instant, Duration => JavaDuration}
import java.time.{Duration => JavaDuration, Instant}
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.regex.Pattern
import com.wixpress.dst.greyhound.core.Serdes.StringSerde
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{RetryAttemptNumber, currentTime}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{currentTime, RetryAttemptNumber}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.WaitingForRetry
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report
import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.{Group, Headers, Topic, durationDeserializer, instantDeserializer}
import com.wixpress.dst.greyhound.core.{durationDeserializer, instantDeserializer, Group, Headers, Topic}
import zio.Clock
import zio.Duration
import zio.Schedule.spaced
Expand All @@ -24,14 +24,14 @@ trait NonBlockingRetryHelper {
def retryTopicsFor(originalTopic: Topic): Set[Topic]

def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)(
implicit trace: Trace
implicit trace: Trace
): UIO[Option[RetryAttempt]]

def retryDecision[E](
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
)(implicit trace: Trace): URIO[Any, RetryDecision]

def retrySteps = retryTopicsFor("").size
Expand All @@ -47,46 +47,44 @@ object NonBlockingRetryHelper {
.getOrElse(NonBlockingBackoffPolicy.empty)

override def retryTopicsFor(topic: Topic): Set[Topic] =
policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) =>
acc + s"$topic-$group-retry-$attempt"
)
policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) => acc + s"$topic-$group-retry-$attempt")

override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)(
implicit trace: Trace
implicit trace: Trace
): UIO[Option[RetryAttempt]] = {
(for {
submitted <- headers.get(RetryHeader.Submitted, instantDeserializer)
backoff <- headers.get(RetryHeader.Backoff, durationDeserializer)
originalTopic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde)
} yield for {
ta <- topicAttempt(subscription, topic, originalTopic)
ta <- topicAttempt(subscription, topic, originalTopic)
TopicAttempt(originalTopic, attempt) = ta
s <- submitted
b <- backoff
s <- submitted
b <- backoff
} yield RetryAttempt(originalTopic, attempt, s, b))
.catchAll(_ => ZIO.none)
}

private def topicAttempt(
subscription: ConsumerSubscription,
topic: Topic,
originalTopicHeader: Option[String]
subscription: ConsumerSubscription,
topic: Topic,
originalTopicHeader: Option[String]
) =
subscription match {
case _: Topics => extractTopicAttempt(group, topic)
case _: Topics => extractTopicAttempt(group, topic)
case _: TopicPattern =>
extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader)
}

override def retryDecision[E](
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
)(implicit trace: Trace): URIO[Any, RetryDecision] = currentTime.map(now => {
val nextRetryAttempt = retryAttempt.fold(0)(_.attempt + 1)
val originalTopic = retryAttempt.fold(record.topic)(_.originalTopic)
val retryTopic = subscription match {
val retryTopic = subscription match {
case _: TopicPattern => patternRetryTopic(group, nextRetryAttempt)
case _: Topics => fixedRetryTopic(originalTopic, group, nextRetryAttempt)
}
Expand Down Expand Up @@ -123,19 +121,19 @@ object NonBlockingRetryHelper {
inputTopic.split(s"-$group-retry-").toSeq match {
case Seq(topic, attempt) if Try(attempt.toInt).isSuccess =>
Some(TopicAttempt(topic, attempt.toInt))
case _ => None
case _ => None
}

private def extractTopicAttemptFromPatternRetryTopic[E](
group: Group,
inputTopic: Topic,
originalTopicHeader: Option[String]
group: Group,
inputTopic: Topic,
originalTopicHeader: Option[String]
) = {
originalTopicHeader.flatMap(originalTopic => {
inputTopic.split(s"__gh_pattern-retry-$group-attempt-").toSeq match {
case Seq(_, attempt) if Try(attempt.toInt).isSuccess =>
Some(TopicAttempt(originalTopic, attempt.toInt))
case _ => None
case _ => None
}
})
}
Expand Down Expand Up @@ -176,14 +174,14 @@ object RetryHeader {
}

case class RetryAttempt(
originalTopic: Topic,
attempt: RetryAttemptNumber,
submittedAt: Instant,
backoff: Duration
originalTopic: Topic,
attempt: RetryAttemptNumber,
submittedAt: Instant,
backoff: Duration
) {

def sleep(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] =
(RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds))
RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds)

private def reportWaitingInIntervals(every: Duration) =
report(WaitingForRetry(originalTopic, attempt, submittedAt.toEpochMilli, backoff.toMillis))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ private[retry] object NonBlockingRetryRecordHandler {
}

private def delayRetry(record: ConsumerRecord[_, _], awaitShutdown: TopicPartition => UIO[AwaitShutdown])(
retryAttempt: RetryAttempt) =
retryAttempt: RetryAttempt
) =
zio.Random.nextInt.flatMap(correlationId =>
report(
WaitingBeforeRetry(record.topic, retryAttempt, record.partition, record.offset, correlationId)
) *>
awaitShutdown(record.topicPartition)
.flatMap(_.interruptOnShutdown(retryAttempt.sleep))
.reporting(r => DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId))
.reporting(r =>
DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId)
)
)

override def isHandlingRetryTopicMessage(group: Group, record: ConsumerRecord[K, V]): Boolean = {
Expand Down
Loading