Skip to content

Commit

Permalink
Configurable consumer inflight queue size (#1648)
Browse files Browse the repository at this point in the history
* set default subscription inflight size to null
* make inflight size to be configurable by admins
  • Loading branch information
moscicky authored Dec 27, 2022
1 parent 92903d3 commit d9dfb02
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.MoreObjects;
import pl.allegro.tech.hermes.api.constraints.AdminPermitted;
import pl.allegro.tech.hermes.api.helpers.Patch;

import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.Null;

public class SubscriptionPolicy {

Expand All @@ -16,7 +19,6 @@ public class SubscriptionPolicy {
private static final int DEFAULT_MESSAGE_BACKOFF = 100;
private static final int DEFAULT_REQUEST_TIMEOUT = 1000;
private static final int DEFAULT_SOCKET_TIMEOUT = 0;
private static final int DEFAULT_INFLIGHT_SIZE = 100;
private static final int DEFAULT_SENDING_DELAY = 0;
private static final double DEFAULT_BACKOFF_MULTIPLIER = 1;
private static final int DEFAULT_BACKOFF_MAX_INTERVAL = 600;
Expand All @@ -40,7 +42,8 @@ public class SubscriptionPolicy {
private int socketTimeout = DEFAULT_SOCKET_TIMEOUT;

@Min(1)
private int inflightSize = DEFAULT_INFLIGHT_SIZE;
@Null(groups = AdminPermitted.class)
private Integer inflightSize;

@Min(0)
@Max(5000)
Expand Down Expand Up @@ -90,7 +93,7 @@ public static SubscriptionPolicy create(Map<String, Object> properties) {
(Integer) properties.getOrDefault("socketTimeout", DEFAULT_SOCKET_TIMEOUT),
(Boolean) properties.getOrDefault("retryClientErrors", false),
(Integer) properties.getOrDefault("messageBackoff", DEFAULT_MESSAGE_BACKOFF),
(Integer) properties.getOrDefault("inflightSize", DEFAULT_INFLIGHT_SIZE),
(Integer) properties.getOrDefault("inflightSize", null),
(Integer) properties.getOrDefault("sendingDelay", DEFAULT_SENDING_DELAY),
((Number) properties.getOrDefault("backoffMultiplier", DEFAULT_BACKOFF_MULTIPLIER)).doubleValue(),
(Integer) properties.getOrDefault("backoffMaxIntervalInSec", DEFAULT_BACKOFF_MAX_INTERVAL)
Expand Down Expand Up @@ -169,6 +172,7 @@ public Integer getSocketTimeout() {
return socketTimeout;
}

@Nullable
public Integer getInflightSize() {
return inflightSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ public SerialConsumer(ReceiverFactory messageReceiverFactory,
}

private int calculateInflightSize(Subscription subscription) {
return Math.min(
subscription.getSerialSubscriptionPolicy().getInflightSize(),
defaultInflight
);
Optional<Integer> subscriptionInflight = Optional.ofNullable(subscription.getSerialSubscriptionPolicy().getInflightSize());
return subscriptionInflight.orElse(defaultInflight);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ class TestRequestUser implements RequestUser {

private final String username
private final boolean admin
private final boolean isOwner

TestRequestUser(String username, boolean admin) {
this(username, admin, false)
}

TestRequestUser(String username, boolean admin, boolean isOwner) {
this.username = username
this.admin = admin
this.isOwner = isOwner
}

@Override
Expand All @@ -24,6 +30,6 @@ class TestRequestUser implements RequestUser {

@Override
boolean isOwner(OwnerId ownerId) {
return false
return isOwner
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package pl.allegro.tech.hermes.management.domain.subscription.validator

import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.SubscriptionPolicy
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository
import pl.allegro.tech.hermes.management.api.validator.ApiPreconditions
import pl.allegro.tech.hermes.management.domain.auth.TestRequestUser
import pl.allegro.tech.hermes.management.domain.owner.validator.OwnerIdValidator
import pl.allegro.tech.hermes.management.domain.topic.TopicService
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject

import javax.validation.ConstraintViolationException

import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription


class InflightSizeValidatorTest extends Specification {

@Shared
def ownerIdValidator = Stub(OwnerIdValidator)
@Shared
def topicService = Stub(TopicService)
@Shared
def subscriptionRepository = Stub(SubscriptionRepository)
@Shared
def endpointOwnershipValidator = Stub(EndpointOwnershipValidator)

@Shared
private static regularUser = new TestRequestUser("regularUser", false, true)
@Shared
private static admin = new TestRequestUser("admin", true)

@Subject
@Shared
SubscriptionValidator subscriptionValidator = new SubscriptionValidator(
ownerIdValidator,
new ApiPreconditions(),
topicService,
subscriptionRepository,
[],
endpointOwnershipValidator,
[]
)

def "creating subscription with inflight size should not be allowed for regular users"() {
given:
def subscription = subscriptionWithInflight(100)
when:
subscriptionValidator.checkCreation(subscription, regularUser)

then:
def exception = thrown(ConstraintViolationException)
exception.message == "serialSubscriptionPolicy.inflightSize: must be null"
}

def "creating subscription with inflight size should be allowed for admin users"() {
given:
def subscription = subscriptionWithInflight(100)

when:
subscriptionValidator.checkCreation(subscription, admin)

then:
noExceptionThrown()
}

def "creating subscription with inflight size less than 1 should not be allowed"() {
given:
def subscription = subscriptionWithInflight(inflightSize)
when:
subscriptionValidator.checkCreation(subscription, admin)

then:
def exception = thrown(ConstraintViolationException)
exception.message == "serialSubscriptionPolicy.inflightSize: must be greater than or equal to 1"

where:
inflightSize << [0, -1]
}

def "creating subscription without inflight size should be allowed for regular and admin users"() {
given:
def subscription = subscriptionWithInflight(null)
when:
subscriptionValidator.checkCreation(subscription, user)

then:
noExceptionThrown()

where:
user << [regularUser, admin]
}

private static Subscription subscriptionWithInflight(Integer inflightSize) {
return subscription("group.topic", "subscription")
.withSubscriptionPolicy(
SubscriptionPolicy.Builder.subscriptionPolicy()
.withInflightSize(inflightSize)
.build()
).build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SubscriptionBuilder {

private String description = "description";

private SubscriptionPolicy serialSubscriptionPolicy = new SubscriptionPolicy(100, 10, 1000, 1000, false, 100, 100, 0, 1, 600);
private SubscriptionPolicy serialSubscriptionPolicy = new SubscriptionPolicy(100, 10, 1000, 1000, false, 100, null, 0, 1, 600);

private BatchSubscriptionPolicy batchSubscriptionPolicy;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class FilteringJsonTest extends IntegrationTest {
static final AvroUser ALICE_GREY = new AvroUser("Alice", 20, "grey");
static final AvroUser BOB_GREY = new AvroUser("Bob", 50, "grey");

private static final SubscriptionPolicy SUBSCRIPTION_POLICY = new SubscriptionPolicy(100, 2000, 1000, 1000, true, 100, 100, 0, 1, 600);
private static final SubscriptionPolicy SUBSCRIPTION_POLICY = new SubscriptionPolicy(100, 2000, 1000, 1000, true, 100, null, 0, 1, 600);

@BeforeMethod
public void initializeAlways() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,89 @@ public void shouldNotAllowUnprivilegedUserToUpdateEndpointWhenSubscribingIsRestr
);
}

@Test
public void shouldSetInflightSizeToNullByDefault() {
// given
Topic topic = createTopic();
Subscription subscription = operations.createSubscription(topic, "subscription", "http://localhost:8081/topics/test-topic");
TestSecurityProvider.setUserIsAdmin(false);

// when
Subscription response = management.subscription().get(
topic.getQualifiedName(),
subscription.getName()
);

// then
assertThat(response.getSerialSubscriptionPolicy().getInflightSize()).isNull();
}

@Test
public void shouldReturnInflightSizeWhenSetToNonNullValue() {
// given
Topic topic = createTopic();
Subscription subscription = operations.createSubscription(topic,
subscriptionWithInflight(topic, "subscription", 42)
);
TestSecurityProvider.setUserIsAdmin(false);

// when
Subscription response = management.subscription().get(
topic.getQualifiedName(),
subscription.getName()
);

// then
assertThat(response.getSerialSubscriptionPolicy().getInflightSize()).isEqualTo(42);
}

@Test
public void shouldNotAllowNonAdminUserToSetInflightSize() {
// given
Topic topic = createTopic();
Subscription subscription = operations.createSubscription(topic, "subscription", "http://localhost:8081/topics/test-topic");
TestSecurityProvider.setUserIsAdmin(false);

PatchData patchData = patchData().set("subscriptionPolicy", ImmutableMap.builder()
.put("inflightSize", 100)
.build()
).build();

// when
Response response = management.subscription().update(
topic.getQualifiedName(),
subscription.getName(),
patchData
);

//then
assertThat(response).hasStatus(BAD_REQUEST).containsMessage("Subscription.serialSubscriptionPolicy.inflightSize must be null");
}

@Test
public void shouldAllowAdminUserToSetInflightSize() {
// given
Topic topic = createTopic();
Subscription subscription = operations.createSubscription(topic, "subscription", "http://localhost:8081/topics/test-topic");
TestSecurityProvider.setUserIsAdmin(true);

PatchData patchData = patchData().set("subscriptionPolicy", ImmutableMap.builder()
.put("inflightSize", 100)
.build()
).build();

// when
Response response = management.subscription().update(
topic.getQualifiedName(),
subscription.getName(),
patchData
);

//then
assertThat(response).hasStatus(OK);
}


private Topic createTopic() {
return operations.buildTopic(
randomTopic("group", "topic")
Expand All @@ -707,6 +790,15 @@ private Topic createTopicWithSubscribingRestricted() {
);
}

private static Subscription subscriptionWithInflight(Topic topic, String subscriptionName, Integer inflightSize) {
return subscription(topic, subscriptionName)
.withSubscriptionPolicy(
SubscriptionPolicy.Builder.subscriptionPolicy()
.withInflightSize(inflightSize)
.build()
).build();
}

private List<Map<String, String>> getMessageTrace(String topic, String subscription, String messageId) {
Response response = management.subscription().getMessageTrace(topic, subscription, messageId);
return response.readEntity(new GenericType<>() {
Expand Down

0 comments on commit d9dfb02

Please sign in to comment.