Skip to content

Commit

Permalink
Endpoint for moving subscription offsets to the latest position (#1662)
Browse files Browse the repository at this point in the history
* Move consumers offsets to the latest value before subscription removal

* Move consumers offsets to the latest positions endpoint

* Move consumers offsets to the latest offset endpoint

* UI draft for moving subscription offsets

* Display errors when missing consumer group while moving offsets

* Fix checkstyle errors

* Fix checkstyle errors

* Improve placeholder for moving offsets
  • Loading branch information
faderskd committed Jun 16, 2023
1 parent 04197ae commit c817e32
Show file tree
Hide file tree
Showing 26 changed files with 396 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.api;

import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.Response;

import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
Expand Down Expand Up @@ -61,7 +62,8 @@ public enum ErrorCode {
PERMISSION_DENIED(FORBIDDEN),
UNKNOWN_MIGRATION(NOT_FOUND),
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND);
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST);

private final int httpCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ Response retransmit(@PathParam("topicName") String qualifiedTopicName,
@DefaultValue("false") @QueryParam("dryRun") boolean dryRun,
OffsetRetransmissionDate offsetRetransmissionDate);

@POST
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/{subscriptionName}/moveOffsetsToTheEnd")
Response moveOffsetsToTheEnd(@PathParam("topicName") String qualifiedTopicName,
@PathParam("subscriptionName") String subscriptionName);

@GET
@Produces(APPLICATION_JSON)
@Path("/{subscriptionName}/events/{messageId}/trace")
Expand Down
2 changes: 2 additions & 0 deletions hermes-console/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
<script src="js/console/diagnostics/DiagnosticsController.js"></script>
<script src="js/console/consistency/ConsistencyRepository.js"></script>
<script src="js/console/consistency/ConsistencyController.js"></script>
<script src="js/console/subscriptionoffsets/SubsriptionService.js"></script>
<script src="js/console/subscriptionoffsets/SubscriptionOffsetsController.js"></script>
<script src="js/console/offlineretransmission/OfflineRetransmissionController.js"></script>
<script src="js/console/offlineretransmission/OfflineRetransmissionRespository.js"></script>
<script src="js/console/topic/TopicRepository.js"></script>
Expand Down
5 changes: 5 additions & 0 deletions hermes-console/static/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var hermes = angular.module('hermes', [
'hermes.visibility',
'hermes.mode',
'hermes.readiness',
'hermes.subscriptionOffsets',
'hermes.offlineRetransmission',
'ui.ace',
]);
Expand Down Expand Up @@ -83,6 +84,10 @@ hermes.config(['$stateProvider', '$urlRouterProvider', '$httpProvider', '$uibToo
url: '/readiness',
templateUrl: 'partials/readiness.html'
})
.state('subscriptionOffsets', {
url: '/subscription-offsets',
templateUrl: 'partials/subscriptionOffsets.html'
})
.state('search', {
url: '/search?entity&property&operator&pattern',
templateUrl: 'partials/search.html'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
var subscriptionOffsets = angular.module('hermes.subscriptionOffsets', ['hermes.subscriptionOffsets.service']);

subscriptionOffsets.controller('SubscriptionOffsetsController', ['$scope', 'SubscriptionService', 'toaster',
function ($scope, subscriptionService, toaster) {
$scope.error = null;
$scope.subscriptionFullName = '';

$scope.moveOffsets = function () {
var subscription = parseSubscription($scope.subscriptionFullName);
subscriptionService.moveOffsets(subscription.topicName, subscription.subscriptionName)
.then(function () {
clearError();
clearData();
displaySuccess();
})
.catch(function (e) {
displayError(e);
});
};

function displayError(msg) {
$scope.error = msg;
}

function clearError() {
$scope.error = null;
}

function displaySuccess() {
toaster.pop('success', 'Success', 'Offsets moved successfully');
}

function clearData() {
$scope.subscriptionFullName = "";
}

function parseSubscription(subscriptionFullName) {
var subscriptionChunks = subscriptionFullName.split('.');
var subscriptionName = subscriptionChunks.pop();
return {
topicName: subscriptionChunks.join('.'),
subscriptionName: subscriptionName,
};
}

function validate() {

}
}]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var service = angular.module('hermes.subscriptionOffsets.service', ['hermes.discovery']);

repository.factory('SubscriptionService', ['DiscoveryService', '$resource',
function (discovery, $resource) {
var subscriptionOffsetsEndpoint = $resource(discovery.resolve('/topics/:topicName/subscriptions/:subscriptionName/moveOffsetsToTheEnd'), null, {
query: {
method: 'POST',
isArray: true
}
});

return {
moveOffsets: function (topicName, subscriptionName) {
return subscriptionOffsetsEndpoint.save({topicName: topicName, subscriptionName: subscriptionName}, {}).$promise;
}
};
}]);
1 change: 1 addition & 0 deletions hermes-console/static/partials/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<li role="menuitem"><a href="#/constraints">Constraints</a></li>
<li role="menuitem"><a href="#/consistency">Consistency</a></li>
<li role="menuitem"><a href="#/readiness">Readiness</a></li>
<li role="menuitem"><a href="#/subscription-offsets">Subscription offsets</a></li>
</ul>
</div>
</div>
Expand Down
30 changes: 30 additions & 0 deletions hermes-console/static/partials/subscriptionOffsets.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<section ng-controller="SubscriptionOffsetsController">

<ol class="breadcrumb">
<li><a href="#/">home</a></li>
<li class="active">subscription-offsets</li>
</ol>

<div class="alert alert-danger" role="alert" ng-if="error">
Error during moving offsets for subscription:<br>
{{error}}
</div>

<h1>Move subscription offsets to the end</h1>
<div class="container">
<div class="row">
<form name="moveOffsetsForm" class="form col-md-10">
<div class="row">
<div class="col-md-6 padding-lr-2">
<div class="form-group">
<input name="subscriptionFullName" class="form-control" placeholder="groupName.topicName.subscriptionName"
ng-model="subscriptionFullName" required pattern="(.+)\.(.+)\.(.+)"/>
</div>
</div>
<div class="col-md-2 padding-lr-2">
<button type="submit" class="btn btn-primary" ng-disabled="moveOffsetsForm.$invalid" ng-click="moveOffsets()">Move</button>
</div>
</div>
</div>
</div>
</section>
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionHealth;
import pl.allegro.tech.hermes.api.SubscriptionMetrics;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.api.auth.HermesSecurityAwareRequestUser;
Expand Down Expand Up @@ -235,7 +236,7 @@ public Response retransmit(@PathParam("topicName") String qualifiedTopicName,
@Valid OffsetRetransmissionDate offsetRetransmissionDate,
@Context ContainerRequestContext requestContext) {

MultiDCOffsetChangeSummary summary = multiDCAwareService.moveOffset(
MultiDCOffsetChangeSummary summary = multiDCAwareService.retransmit(
topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)),
subscriptionName,
offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(),
Expand All @@ -246,6 +247,20 @@ public Response retransmit(@PathParam("topicName") String qualifiedTopicName,
return Response.status(OK).entity(summary).build();
}

@POST
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@RolesAllowed({Roles.ADMIN})
@Path("/{subscriptionName}/moveOffsetsToTheEnd")
public Response moveOffsetsToTheEnd(@PathParam("topicName") String qualifiedTopicName,
@PathParam("subscriptionName") String subscriptionName) {
TopicName topicName = fromQualifiedName(qualifiedTopicName);
multiDCAwareService.moveOffsetsToTheEnd(
topicService.getTopicDetails(topicName),
new SubscriptionName(subscriptionName, topicName));
return responseStatus(OK);
}

@GET
@Produces(APPLICATION_JSON)
@Path("/{subscriptionName}/events/{messageId}/trace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public SubscriptionRemover subscriptionRemover(Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
return new SubscriptionRemover(auditor, multiDatacenterRepositoryCommandExecutor, subscriptionOwnerCache, subscriptionRepository);
return new SubscriptionRemover(auditor, multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache, subscriptionRepository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
Expand Down Expand Up @@ -96,9 +97,9 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch
new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter);
return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader,
retransmissionService, brokerTopicManagement, kafkaNamesMapper,
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper));
new OffsetsAvailableChecker(consumerPool, storage), new LogEndOffsetChecker(consumerPool),
brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
}).collect(toList());

return new MultiDCAwareService(
Expand All @@ -116,6 +117,11 @@ private ConsumerGroupManager createConsumerGroupManager(KafkaProperties kafkaPro
: new NoOpConsumerGroupManager();
}

private KafkaConsumerManager createKafkaConsumerManager(KafkaProperties kafkaProperties,
KafkaNamesMapper kafkaNamesMapper) {
return new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, kafkaProperties.getBootstrapKafkaServer());
}

private SubscriptionOffsetChangeIndicator getRepository(
List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> repositories,
KafkaProperties kafkaProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public class SubscriptionRemover {

public SubscriptionRemover(Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
SubscriptionOwnerCache subscriptionOwnerCache, SubscriptionRepository subscriptionRepository) {
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
this.auditor = auditor;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
}


public void removeSubscription(TopicName topicName, String subscriptionName, RequestUser removedBy) {
auditor.beforeObjectRemoval(removedBy.getUsername(), Subscription.class.getSimpleName(), subscriptionName);
Subscription subscription = subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void waitUntilAllSubscriptionsHasConsumersAssigned(Topic topic, Duration assignm
}

private void notifySingleSubscription(Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) {
multiDCAwareService.moveOffset(topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
multiDCAwareService.retransmit(topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
}

private void waitUntilOffsetsAvailableOnAllKafkaTopics(Topic topic, Duration offsetsAvailableTimeout) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka;

import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.management.domain.ManagementException;

public class MovingSubscriptionOffsetsValidationException extends ManagementException {
public MovingSubscriptionOffsetsValidationException(String msg) {
super(msg);
}

@Override
public ErrorCode getCode() {
return ErrorCode.MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String readMessageFromPrimary(String clusterName, Topic topic, Integer pa
.readMessageFromPrimary(topic, partition, offset);
}

public MultiDCOffsetChangeSummary moveOffset(Topic topic,
public MultiDCOffsetChangeSummary retransmit(Topic topic,
String subscriptionName,
Long timestamp,
boolean dryRun,
Expand Down Expand Up @@ -145,4 +145,8 @@ public List<ConsumerGroup> describeConsumerGroups(Topic topic, String subscripti
.map(Optional::get)
.collect(toList());
}

public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription));
}
}
Loading

0 comments on commit c817e32

Please sign in to comment.