diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala index 9180fc4d62..14e5ab0491 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala @@ -9,6 +9,7 @@ import zio.kafka.consumer.{ ConsumerSettings as ZKConsumerSettings, Subscription as ZKSubscription } +import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval} import zio.kafka.producer.{Producer as ZKProducer, ProducerSettings as ZKProducerSettings} import zio.kafka.serde.{Deserializer as ZKDeserializer, Serializer as ZKSerializer} @@ -80,7 +81,7 @@ class ZKafkaConsumerImpl[K, V]( .withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record' // 'pollTimeout' default is 50 millis. This is a ZIO Kafka property. .withPollTimeout(pollTimeout) - // .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) + .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) .withRebalanceSafeCommits(rebalanceSafeCommits) // .withMaxRebalanceDuration(30.seconds) )