diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java
index e63c4db834..767edf98ec 100644
--- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java
+++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.api;
+import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.Response;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
@@ -61,7 +62,8 @@ public enum ErrorCode {
PERMISSION_DENIED(FORBIDDEN),
UNKNOWN_MIGRATION(NOT_FOUND),
INVALID_QUERY(BAD_REQUEST),
- IMPLEMENTATION_ABSENT(NOT_FOUND);
+ IMPLEMENTATION_ABSENT(NOT_FOUND),
+ MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST);
private final int httpCode;
diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/SubscriptionEndpoint.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/SubscriptionEndpoint.java
index 5d3bf556ba..f8316029bd 100644
--- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/SubscriptionEndpoint.java
+++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/SubscriptionEndpoint.java
@@ -105,6 +105,13 @@ Response retransmit(@PathParam("topicName") String qualifiedTopicName,
@DefaultValue("false") @QueryParam("dryRun") boolean dryRun,
OffsetRetransmissionDate offsetRetransmissionDate);
+ @POST
+ @Consumes(APPLICATION_JSON)
+ @Produces(APPLICATION_JSON)
+ @Path("/{subscriptionName}/moveOffsetsToTheEnd")
+ Response moveOffsetsToTheEnd(@PathParam("topicName") String qualifiedTopicName,
+ @PathParam("subscriptionName") String subscriptionName);
+
@GET
@Produces(APPLICATION_JSON)
@Path("/{subscriptionName}/events/{messageId}/trace")
diff --git a/hermes-console/static/index.html b/hermes-console/static/index.html
index 5efbe0270b..743f3ade6b 100644
--- a/hermes-console/static/index.html
+++ b/hermes-console/static/index.html
@@ -62,6 +62,8 @@
+
+
diff --git a/hermes-console/static/js/app.js b/hermes-console/static/js/app.js
index 9e28edae4d..dca2af1a57 100644
--- a/hermes-console/static/js/app.js
+++ b/hermes-console/static/js/app.js
@@ -17,6 +17,7 @@ var hermes = angular.module('hermes', [
'hermes.visibility',
'hermes.mode',
'hermes.readiness',
+ 'hermes.subscriptionOffsets',
'hermes.offlineRetransmission',
'ui.ace',
]);
@@ -83,6 +84,10 @@ hermes.config(['$stateProvider', '$urlRouterProvider', '$httpProvider', '$uibToo
url: '/readiness',
templateUrl: 'partials/readiness.html'
})
+ .state('subscriptionOffsets', {
+ url: '/subscription-offsets',
+ templateUrl: 'partials/subscriptionOffsets.html'
+ })
.state('search', {
url: '/search?entity&property&operator&pattern',
templateUrl: 'partials/search.html'
diff --git a/hermes-console/static/js/console/subscriptionoffsets/SubscriptionOffsetsController.js b/hermes-console/static/js/console/subscriptionoffsets/SubscriptionOffsetsController.js
new file mode 100644
index 0000000000..ce4bb8af52
--- /dev/null
+++ b/hermes-console/static/js/console/subscriptionoffsets/SubscriptionOffsetsController.js
@@ -0,0 +1,49 @@
+var subscriptionOffsets = angular.module('hermes.subscriptionOffsets', ['hermes.subscriptionOffsets.service']);
+
+subscriptionOffsets.controller('SubscriptionOffsetsController', ['$scope', 'SubscriptionService', 'toaster',
+ function ($scope, subscriptionService, toaster) {
+ $scope.error = null;
+ $scope.subscriptionFullName = '';
+
+ $scope.moveOffsets = function () {
+ var subscription = parseSubscription($scope.subscriptionFullName);
+ subscriptionService.moveOffsets(subscription.topicName, subscription.subscriptionName)
+ .then(function () {
+ clearError();
+ clearData();
+ displaySuccess();
+ })
+ .catch(function (e) {
+ displayError(e);
+ });
+ };
+
+ function displayError(msg) {
+ $scope.error = msg;
+ }
+
+ function clearError() {
+ $scope.error = null;
+ }
+
+ function displaySuccess() {
+ toaster.pop('success', 'Success', 'Offsets moved successfully');
+ }
+
+ function clearData() {
+ $scope.subscriptionFullName = "";
+ }
+
+ function parseSubscription(subscriptionFullName) {
+ var subscriptionChunks = subscriptionFullName.split('.');
+ var subscriptionName = subscriptionChunks.pop();
+ return {
+ topicName: subscriptionChunks.join('.'),
+ subscriptionName: subscriptionName,
+ };
+ }
+
+ function validate() {
+
+ }
+ }]);
\ No newline at end of file
diff --git a/hermes-console/static/js/console/subscriptionoffsets/SubsriptionService.js b/hermes-console/static/js/console/subscriptionoffsets/SubsriptionService.js
new file mode 100644
index 0000000000..c5d37b8baf
--- /dev/null
+++ b/hermes-console/static/js/console/subscriptionoffsets/SubsriptionService.js
@@ -0,0 +1,17 @@
+var service = angular.module('hermes.subscriptionOffsets.service', ['hermes.discovery']);
+
+repository.factory('SubscriptionService', ['DiscoveryService', '$resource',
+ function (discovery, $resource) {
+ var subscriptionOffsetsEndpoint = $resource(discovery.resolve('/topics/:topicName/subscriptions/:subscriptionName/moveOffsetsToTheEnd'), null, {
+ query: {
+ method: 'POST',
+ isArray: true
+ }
+ });
+
+ return {
+ moveOffsets: function (topicName, subscriptionName) {
+ return subscriptionOffsetsEndpoint.save({topicName: topicName, subscriptionName: subscriptionName}, {}).$promise;
+ }
+ };
+ }]);
diff --git a/hermes-console/static/partials/home.html b/hermes-console/static/partials/home.html
index 6cad42eb56..b8766357b4 100644
--- a/hermes-console/static/partials/home.html
+++ b/hermes-console/static/partials/home.html
@@ -48,6 +48,7 @@
Constraints
Consistency
Readiness
+ Subscription offsets
diff --git a/hermes-console/static/partials/subscriptionOffsets.html b/hermes-console/static/partials/subscriptionOffsets.html
new file mode 100644
index 0000000000..a2a96af6ac
--- /dev/null
+++ b/hermes-console/static/partials/subscriptionOffsets.html
@@ -0,0 +1,30 @@
+
+
+
+ - home
+ - subscription-offsets
+
+
+
+ Error during moving offsets for subscription:
+ {{error}}
+
+
+ Move subscription offsets to the end
+
+
diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaMessageReceiverFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaMessageReceiverFactory.java
index fd8cf241ae..eda7c4445a 100644
--- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaMessageReceiverFactory.java
+++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaMessageReceiverFactory.java
@@ -1,6 +1,5 @@
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java
index baed837b6f..0698f8ccf3 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java
@@ -12,6 +12,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionHealth;
import pl.allegro.tech.hermes.api.SubscriptionMetrics;
+import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.api.auth.HermesSecurityAwareRequestUser;
@@ -235,7 +236,7 @@ public Response retransmit(@PathParam("topicName") String qualifiedTopicName,
@Valid OffsetRetransmissionDate offsetRetransmissionDate,
@Context ContainerRequestContext requestContext) {
- MultiDCOffsetChangeSummary summary = multiDCAwareService.moveOffset(
+ MultiDCOffsetChangeSummary summary = multiDCAwareService.retransmit(
topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)),
subscriptionName,
offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(),
@@ -246,6 +247,20 @@ public Response retransmit(@PathParam("topicName") String qualifiedTopicName,
return Response.status(OK).entity(summary).build();
}
+ @POST
+ @Consumes(APPLICATION_JSON)
+ @Produces(APPLICATION_JSON)
+ @RolesAllowed({Roles.ADMIN})
+ @Path("/{subscriptionName}/moveOffsetsToTheEnd")
+ public Response moveOffsetsToTheEnd(@PathParam("topicName") String qualifiedTopicName,
+ @PathParam("subscriptionName") String subscriptionName) {
+ TopicName topicName = fromQualifiedName(qualifiedTopicName);
+ multiDCAwareService.moveOffsetsToTheEnd(
+ topicService.getTopicDetails(topicName),
+ new SubscriptionName(subscriptionName, topicName));
+ return responseStatus(OK);
+ }
+
@GET
@Produces(APPLICATION_JSON)
@Path("/{subscriptionName}/events/{messageId}/trace")
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java
index a134d61747..a8c176c362 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java
@@ -124,6 +124,7 @@ public SubscriptionRemover subscriptionRemover(Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
- return new SubscriptionRemover(auditor, multiDatacenterRepositoryCommandExecutor, subscriptionOwnerCache, subscriptionRepository);
+ return new SubscriptionRemover(auditor, multiDatacenterRepositoryCommandExecutor,
+ subscriptionOwnerCache, subscriptionRepository);
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
index cfefa59c10..cf5fcdcb7e 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
@@ -23,6 +23,7 @@
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager;
+import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
@@ -96,9 +97,9 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch
new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter);
return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader,
retransmissionService, brokerTopicManagement, kafkaNamesMapper,
- new OffsetsAvailableChecker(consumerPool, storage),
- new LogEndOffsetChecker(consumerPool),
- brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper));
+ new OffsetsAvailableChecker(consumerPool, storage), new LogEndOffsetChecker(consumerPool),
+ brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
+ createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
}).collect(toList());
return new MultiDCAwareService(
@@ -116,6 +117,11 @@ private ConsumerGroupManager createConsumerGroupManager(KafkaProperties kafkaPro
: new NoOpConsumerGroupManager();
}
+ private KafkaConsumerManager createKafkaConsumerManager(KafkaProperties kafkaProperties,
+ KafkaNamesMapper kafkaNamesMapper) {
+ return new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, kafkaProperties.getBootstrapKafkaServer());
+ }
+
private SubscriptionOffsetChangeIndicator getRepository(
List> repositories,
KafkaProperties kafkaProperties) {
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java
index 572c5b30fe..4e9a90edfd 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java
@@ -25,14 +25,14 @@ public class SubscriptionRemover {
public SubscriptionRemover(Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
- SubscriptionOwnerCache subscriptionOwnerCache, SubscriptionRepository subscriptionRepository) {
+ SubscriptionOwnerCache subscriptionOwnerCache,
+ SubscriptionRepository subscriptionRepository) {
this.auditor = auditor;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
}
-
public void removeSubscription(TopicName topicName, String subscriptionName, RequestUser removedBy) {
auditor.beforeObjectRemoval(removedBy.getUsername(), Subscription.class.getSimpleName(), subscriptionName);
Subscription subscription = subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java
index 0bf5d43e59..e2e338f6c4 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java
@@ -59,7 +59,7 @@ void waitUntilAllSubscriptionsHasConsumersAssigned(Topic topic, Duration assignm
}
private void notifySingleSubscription(Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) {
- multiDCAwareService.moveOffset(topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
+ multiDCAwareService.retransmit(topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
}
private void waitUntilOffsetsAvailableOnAllKafkaTopics(Topic topic, Duration offsetsAvailableTimeout) {
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MovingSubscriptionOffsetsValidationException.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MovingSubscriptionOffsetsValidationException.java
new file mode 100644
index 0000000000..b338424e7d
--- /dev/null
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MovingSubscriptionOffsetsValidationException.java
@@ -0,0 +1,15 @@
+package pl.allegro.tech.hermes.management.infrastructure.kafka;
+
+import pl.allegro.tech.hermes.api.ErrorCode;
+import pl.allegro.tech.hermes.management.domain.ManagementException;
+
+public class MovingSubscriptionOffsetsValidationException extends ManagementException {
+ public MovingSubscriptionOffsetsValidationException(String msg) {
+ super(msg);
+ }
+
+ @Override
+ public ErrorCode getCode() {
+ return ErrorCode.MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR;
+ }
+}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java
index c6405f5574..10b314ca2a 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java
@@ -59,7 +59,7 @@ public String readMessageFromPrimary(String clusterName, Topic topic, Integer pa
.readMessageFromPrimary(topic, partition, offset);
}
- public MultiDCOffsetChangeSummary moveOffset(Topic topic,
+ public MultiDCOffsetChangeSummary retransmit(Topic topic,
String subscriptionName,
Long timestamp,
boolean dryRun,
@@ -145,4 +145,8 @@ public List describeConsumerGroups(Topic topic, String subscripti
.map(Optional::get)
.collect(toList());
}
+
+ public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
+ clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription));
+ }
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
index 5f51492378..d9ceed57be 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
@@ -1,12 +1,18 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.ConsumerGroup;
import pl.allegro.tech.hermes.api.Subscription;
+import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
@@ -14,17 +20,24 @@
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReader;
+import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+
public class BrokersClusterService {
private static final Logger logger = LoggerFactory.getLogger(BrokersClusterService.class);
@@ -38,12 +51,14 @@ public class BrokersClusterService {
private final ConsumerGroupsDescriber consumerGroupsDescriber;
private final AdminClient adminClient;
private final ConsumerGroupManager consumerGroupManager;
+ private final KafkaConsumerManager kafkaConsumerManager;
public BrokersClusterService(String clusterName, SingleMessageReader singleMessageReader,
RetransmissionService retransmissionService, BrokerTopicManagement brokerTopicManagement,
KafkaNamesMapper kafkaNamesMapper, OffsetsAvailableChecker offsetsAvailableChecker,
LogEndOffsetChecker logEndOffsetChecker, AdminClient adminClient,
- ConsumerGroupManager consumerGroupManager) {
+ ConsumerGroupManager consumerGroupManager,
+ KafkaConsumerManager kafkaConsumerManager) {
this.clusterName = clusterName;
this.singleMessageReader = singleMessageReader;
this.retransmissionService = retransmissionService;
@@ -58,6 +73,7 @@ public BrokersClusterService(String clusterName, SingleMessageReader singleMessa
);
this.adminClient = adminClient;
this.consumerGroupManager = consumerGroupManager;
+ this.kafkaConsumerManager = kafkaConsumerManager;
}
public String getClusterName() {
@@ -123,6 +139,23 @@ public Optional describeConsumerGroup(Topic topic, String subscri
return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName);
}
+ public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
+ validateIfOffsetsCanBeMoved(topic, subscription);
+
+ KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription);
+ String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
+ Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName);
+ consumer.assign(topicPartitions);
+
+ Map endOffsets = consumer.endOffsets(topicPartitions);
+ Map endOffsetsMetadata = buildOffsetsMetadata(endOffsets);
+ consumer.commitSync(endOffsetsMetadata);
+ consumer.close();
+
+ logger.info("Successfully moved offset to the end position for subscription {} and consumer group {}",
+ subscription.getQualifiedName(), kafkaNamesMapper.toConsumerGroupId(subscription));
+ }
+
private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds) throws ExecutionException, InterruptedException {
Collection consumerGroupsDescriptions =
adminClient.describeConsumerGroups(consumerGroupsIds).all().get().values();
@@ -131,6 +164,21 @@ private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds
.collect(Collectors.toList()).size();
}
+ private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) {
+ describeConsumerGroup(topic, subscription.getName())
+ .ifPresentOrElse(
+ group -> {
+ if (!group.getMembers().isEmpty()) {
+ String s = format("Consumer group %s for subscription %s has still active members.",
+ group.getGroupId(), subscription.getQualifiedName());
+ throw new MovingSubscriptionOffsetsValidationException(s);
+ }
+ }, () -> {
+ String s = format("No consumer group for subscription %s exists.", subscription.getQualifiedName());
+ throw new MovingSubscriptionOffsetsValidationException(s);
+ });
+ }
+
private int numberOfPartitionsForTopic(Topic topic) throws ExecutionException, InterruptedException {
List kafkaTopicsNames = kafkaNamesMapper.toKafkaTopics(topic).stream()
.map(kafkaTopic -> kafkaTopic.name().asString())
@@ -140,4 +188,16 @@ private int numberOfPartitionsForTopic(Topic topic) throws ExecutionException, I
.map(v -> v.partitions().size())
.reduce(0, Integer::sum);
}
+
+ private Set getTopicPartitions(KafkaConsumer consumer, String kafkaTopicName) {
+ return consumer.partitionsFor(kafkaTopicName).stream()
+ .map(info -> new TopicPartition(info.topic(), info.partition()))
+ .collect(toSet());
+ }
+
+ private Map buildOffsetsMetadata(Map offsets) {
+ return offsets.entrySet().stream()
+ .map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+ .collect(toMap(Pair::getKey, Pair::getValue));
+ }
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/ConsumerGroupsDescriber.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/ConsumerGroupsDescriber.java
index 4e0df28079..46d87599f5 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/ConsumerGroupsDescriber.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/ConsumerGroupsDescriber.java
@@ -4,6 +4,7 @@
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,6 +18,7 @@
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
+import pl.allegro.tech.hermes.management.infrastructure.kafka.BrokersClusterCommunicationException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -53,7 +55,7 @@ Optional describeConsumerGroup(Topic topic, String subscriptionNa
return describeConsumerGroup(consumerGroupId, kafkaTopics);
} catch (Exception e) {
logger.error("Failed to describe group with id: {}", consumerGroupId.asString(), e);
- return Optional.empty();
+ throw new BrokersClusterCommunicationException(e);
}
}
@@ -73,7 +75,9 @@ private Optional describeConsumerGroup(ConsumerGroupId consumerGr
.stream()
.findFirst();
- return description.map(d -> getKafkaConsumerGroup(topicPartitionOffsets, kafkaTopicContentTypes, d));
+ return description
+ .map(d -> d.state() != ConsumerGroupState.DEAD ? d : null)
+ .map(d -> getKafkaConsumerGroup(topicPartitionOffsets, kafkaTopicContentTypes, d));
}
private ConsumerGroup getKafkaConsumerGroup(Map topicPartitionOffsets,
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java
index 8b235b9c15..f806b8d7ef 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java
@@ -9,27 +9,15 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
-import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
-import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
public class KafkaConsumerGroupManager implements ConsumerGroupManager {
@@ -37,8 +25,7 @@ public class KafkaConsumerGroupManager implements ConsumerGroupManager {
private final KafkaNamesMapper kafkaNamesMapper;
private final String clusterName;
- private final String bootstrapKafkaServer;
- private final KafkaProperties kafkaProperties;
+ private final KafkaConsumerManager consumerManager;
public KafkaConsumerGroupManager(KafkaNamesMapper kafkaNamesMapper,
String clusterName,
@@ -46,17 +33,14 @@ public KafkaConsumerGroupManager(KafkaNamesMapper kafkaNamesMapper,
KafkaProperties kafkaProperties) {
this.kafkaNamesMapper = kafkaNamesMapper;
this.clusterName = clusterName;
- this.bootstrapKafkaServer = bootstrapKafkaServer;
- this.kafkaProperties = kafkaProperties;
+ this.consumerManager = new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, bootstrapKafkaServer);
}
@Override
public void createConsumerGroup(Topic topic, Subscription subscription) {
logger.info("Creating consumer group for subscription {}, cluster: {}", subscription.getQualifiedName(), clusterName);
- ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName());
- KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties(groupId));
-
+ KafkaConsumer kafkaConsumer = consumerManager.createConsumer(subscription.getQualifiedName());
try {
String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
Set topicPartitions = kafkaConsumer.partitionsFor(kafkaTopicName).stream()
@@ -84,21 +68,4 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
subscription.getQualifiedName(), clusterName, e);
}
}
-
- private Properties properties(ConsumerGroupId groupId) {
- Properties props = new Properties();
- props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapKafkaServer);
- props.put(GROUP_ID_CONFIG, groupId.asString());
- props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(REQUEST_TIMEOUT_MS_CONFIG, 5000);
- props.put(DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
- props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- if (kafkaProperties.getSasl().isEnabled()) {
- props.put(SASL_MECHANISM, kafkaProperties.getSasl().getMechanism());
- props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSasl().getProtocol());
- props.put(SASL_JAAS_CONFIG, kafkaProperties.getSasl().getJaasConfig());
- }
- return props;
- }
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerManager.java
new file mode 100644
index 0000000000..a7374a7663
--- /dev/null
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerManager.java
@@ -0,0 +1,55 @@
+package pl.allegro.tech.hermes.management.infrastructure.kafka.service;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import pl.allegro.tech.hermes.api.SubscriptionName;
+import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
+import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
+import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
+
+import java.util.Properties;
+
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+
+public class KafkaConsumerManager {
+
+ private final KafkaNamesMapper kafkaNamesMapper;
+ private final String bootstrapKafkaServer;
+ private final KafkaProperties kafkaProperties;
+
+ public KafkaConsumerManager(KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper, String bootstrapKafkaServer) {
+ this.kafkaNamesMapper = kafkaNamesMapper;
+ this.bootstrapKafkaServer = bootstrapKafkaServer;
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ public KafkaConsumer createConsumer(SubscriptionName subscription) {
+ ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscription);
+ return new KafkaConsumer<>(properties(groupId));
+ }
+
+ private Properties properties(ConsumerGroupId groupId) {
+ Properties props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapKafkaServer);
+ props.put(GROUP_ID_CONFIG, groupId.asString());
+ props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(REQUEST_TIMEOUT_MS_CONFIG, 5000);
+ props.put(DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
+ props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ if (kafkaProperties.getSasl().isEnabled()) {
+ props.put(SASL_MECHANISM, kafkaProperties.getSasl().getMechanism());
+ props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSasl().getProtocol());
+ props.put(SASL_JAAS_CONFIG, kafkaProperties.getSasl().getJaasConfig());
+ }
+ return props;
+ }
+}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaRawMessageReader.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaRawMessageReader.java
index 77c20e919c..199b540693 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaRawMessageReader.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaRawMessageReader.java
@@ -51,7 +51,7 @@ byte[] readMessage(KafkaTopic topic, int partition, long offset) {
try {
kafkaConsumer.assign(Collections.singleton(topicPartition));
- kafkaConsumer.poll(0);
+ kafkaConsumer.poll(Duration.ZERO);
kafkaConsumer.seek(topicPartition, offset);
ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(pollTimeoutMillis));
for (ConsumerRecord record : records.records(topicPartition)) {
diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/BrokerOperations.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/BrokerOperations.java
index 64452f1a8a..5175686c67 100644
--- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/BrokerOperations.java
+++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/BrokerOperations.java
@@ -1,8 +1,19 @@
package pl.allegro.tech.hermes.test.helper.endpoint;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
+import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.JsonToAvroMigrationKafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
@@ -32,10 +43,9 @@ public class BrokerOperations {
private final Map adminClients;
private final KafkaNamesMapper kafkaNamesMapper;
- public BrokerOperations(Map kafkaConnection) {
+ public BrokerOperations(Map kafkaConnection, String namespace) {
adminClients = kafkaConnection.entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> brokerAdminClient(e.getValue())));
- String namespace = "";
String namespaceSeparator = "_";
kafkaNamesMapper = new JsonToAvroMigrationKafkaNamesMapper(namespace, namespaceSeparator);
}
@@ -48,6 +58,39 @@ public void createTopic(String topicName, String brokerName) {
createTopic(topicName, adminClients.get(brokerName));
}
+ public List getTopicPartitionsOffsets(SubscriptionName subscriptionName) {
+ ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscriptionName);
+ return adminClients.values().stream()
+ .flatMap(client -> {
+ Map currentOffsets = getTopicCurrentOffsets(consumerGroupId, client);
+ Map endOffsets = getEndOffsets(client, new ArrayList<>(currentOffsets.keySet()));
+ return currentOffsets.keySet()
+ .stream()
+ .map(partition -> new ConsumerGroupOffset(
+ currentOffsets.get(partition).offset(),
+ endOffsets.get(partition).offset())
+ );
+ }).collect(Collectors.toList());
+ }
+
+ private Map getTopicCurrentOffsets(ConsumerGroupId consumerGroupId, AdminClient client) {
+ try {
+ return client.listConsumerGroupOffsets(consumerGroupId.asString()).partitionsToOffsetAndMetadata().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map getEndOffsets(AdminClient client, List partitions) {
+ try {
+ ListOffsetsResult listOffsetsResult = client.listOffsets(
+ partitions.stream().collect(toMap(Function.identity(), p -> OffsetSpec.latest())));
+ return listOffsetsResult.all().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private void createTopic(String topicName, AdminClient adminClient) {
Topic topic = topic(topicName).build();
kafkaNamesMapper.toKafkaTopics(topic)
@@ -90,4 +133,18 @@ private AdminClient brokerAdminClient(String bootstrapKafkaServer) {
props.put(REQUEST_TIMEOUT_MS_CONFIG, 10000);
return AdminClient.create(props);
}
+
+ public static class ConsumerGroupOffset {
+ private final long currentOffset;
+ private final long endOffset;
+
+ ConsumerGroupOffset(long currentOffset, long endOffset) {
+ this.currentOffset = currentOffset;
+ this.endOffset = endOffset;
+ }
+
+ public boolean movedToEnd() {
+ return currentOffset == endOffset;
+ }
+ }
}
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/IntegrationTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/IntegrationTest.java
index fca2838e49..effc0c74e3 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/IntegrationTest.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/IntegrationTest.java
@@ -44,7 +44,7 @@ public void initializeIntegrationTest() {
ImmutableMap.of(
PRIMARY_KAFKA_CLUSTER_NAME, kafkaClusterOne.getBootstrapServersForExternalClients(),
SECONDARY_KAFKA_CLUSTER_NAME, kafkaClusterTwo.getBootstrapServersForExternalClients()
- ));
+ ), KAFKA_NAMESPACE);
this.wait = new Waiter(management, services().zookeeper(), brokerOperations, PRIMARY_KAFKA_CLUSTER_NAME, KAFKA_NAMESPACE);
this.operations = new HermesAPIOperations(management, wait);
this.auditEvents = new AuditEventEndpoint(SharedServices.services().auditEventMock());
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/EnvironmentAware.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/EnvironmentAware.java
index b937ea037d..e7b77f3294 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/EnvironmentAware.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/EnvironmentAware.java
@@ -40,5 +40,5 @@ public interface EnvironmentAware {
String SECONDARY_KAFKA_CLUSTER_NAME = "secondary";
- String KAFKA_NAMESPACE = "";
+ String KAFKA_NAMESPACE = "itTest";
}
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java
index 4b705c6a74..7187c0e3f3 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.integration.management;
import com.google.common.collect.ImmutableMap;
+import com.jayway.awaitility.Duration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -24,6 +25,7 @@
import pl.allegro.tech.hermes.integration.helper.GraphiteEndpoint;
import pl.allegro.tech.hermes.integration.shame.Unreliable;
import pl.allegro.tech.hermes.management.TestSecurityProvider;
+import pl.allegro.tech.hermes.test.helper.endpoint.BrokerOperations.ConsumerGroupOffset;
import pl.allegro.tech.hermes.test.helper.endpoint.RemoteServiceEndpoint;
import pl.allegro.tech.hermes.test.helper.message.TestMessage;
@@ -38,6 +40,7 @@
import javax.ws.rs.core.Response;
import static com.jayway.awaitility.Awaitility.await;
+import static com.jayway.awaitility.Awaitility.waitAtMost;
import static java.net.URI.create;
import static javax.ws.rs.client.ClientBuilder.newClient;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
@@ -283,6 +286,37 @@ public void shouldRemoveSubscription() {
"subscription");
}
+ @Test
+ public void shouldMoveOffsetsToTheEnd() {
+ // given
+ Topic topic = operations.buildTopic(randomTopic("moveSubscriptionOffsets", "topic").build());
+ Subscription subscription = subscription(topic.getQualifiedName(), "subscription", remoteService.getUrl())
+ // prevents discarding messages and moving offsets
+ .withSubscriptionPolicy(SubscriptionPolicy.create(Map.of("messageTtl", 3600)))
+ .build();
+ management.subscription().create(subscription.getQualifiedTopicName(), subscription);
+ List messages = List.of(MESSAGE.body(), MESSAGE.body(), MESSAGE.body(), MESSAGE.body());
+
+ // prevents from moving offsets during messages sending
+ remoteService.setReturnedStatusCode(503);
+ remoteService.expectMessages(messages);
+ publishMessages(topic.getQualifiedName(), messages);
+ remoteService.waitUntilReceived();
+
+ assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isFalse();
+
+ management.subscription().remove(topic.getQualifiedName(), subscription.getName());
+
+ // when
+ wait.awaitAtMost(Duration.TEN_SECONDS)
+ .until(() -> management
+ .subscription()
+ .moveOffsetsToTheEnd(topic.getQualifiedName(), subscription.getName()).getStatus() == 200);
+
+ // then
+ assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isTrue();
+ }
+
@Unreliable
@Test(enabled = false)
public void shouldGetEventStatus() {
@@ -790,7 +824,7 @@ private Topic createTopicWithSubscribingRestricted() {
);
}
- private static Subscription subscriptionWithInflight(Topic topic, String subscriptionName, Integer inflightSize) {
+ private static Subscription subscriptionWithInflight(Topic topic, String subscriptionName, Integer inflightSize) {
return subscription(topic, subscriptionName)
.withSubscriptionPolicy(
SubscriptionPolicy.Builder.subscriptionPolicy()
@@ -805,7 +839,16 @@ private List