Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
pengxiangrui127 committed Dec 2, 2023
2 parents 0b6804d + 9872daa commit a3a381f
Show file tree
Hide file tree
Showing 34 changed files with 582 additions and 305 deletions.
137 changes: 137 additions & 0 deletions pip/pip-315.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# PIP-315: Configurable max delay limit for delayed delivery

# Background knowledge
Delayed message delivery is an important feature which allows a producer to specify that a message should be delivered/consumed at a later time. Currently the broker will save a delayed message without any check. The message's `deliverAt` time is checked when the broker dispatches messages to the Consumer. If a message has a `deliverAt` time, then it is added to the `DelayedDeliveryTracker` and will be delivered later when eligible.

Delayed message delivery is only available for persistent topics, and shared/key-shared subscription types.

# Motivation
Currently there is no max delay limit so a producer can specify any delay when publishing a message.

This poses a few challenges:
1. Producer may miscalculate/misconfigure a very large delay (ex. 1,000 day instead of 100 day delay)
2. Pulsar administrators may want to limit the max allowed delay since unacked messages (ex. messages with a large delay) will be stored forever (unless TTL is configured)
3. The configured delay may be greater than the configured TTL which means the delayed message may be deleted before the `deliverAt` time (before the consumer can process it)

# Goals
The purpose of this PIP is to introduce an optional configuration to limit the max allowed delay for delayed delivery.

## In Scope
- Add broker configuration to limit the max allowed delay for delayed delivery
- Configurable at broker/topic/namespace-level

# High Level Design
We will add a configuration `maxDeliveryDelayInMillis` and if configured, the broker will check incoming delayed messages to see if the message's `deliverAt` time exceeds the configured limit. If it exceeds the limit, the broker will send an error back to the Producer.

# Detailed Design

## Design & Implementation Details

### Broker Changes
A new `maxDeliveryDelayInMillis` config will be added to the broker which is initially defaulted to 0 (disabled). The default (disabled) behavior will match the current delayed delivery behavior (no limit on delivery delay).
```
# broker.conf
delayedDeliveryMaxDeliveryDelayInMillis=0
```

This field will also be added to the existing `DelayedDeliveryPolicies` interface to support topic & namespace-level configuration:
```java
public interface DelayedDeliveryPolicies {
long getMaxDeliveryDelayInMillis();
}
```

The max delivery delay check will occur in the broker's `Producer` class inside of `checkAndStartPublish` (same place as other checks such as `isEncryptionEnabled`).

We will give a `ServerError.NotAllowedError` error if all of the following are true:
1. Sending to a persistent topic
2. Topic has `delayedDeliveryEnabled=true`
3. `MessageMetadata` `deliver_at_time` has been specified
4. Topic has `>0` value for `maxDeliveryDelayInMillis`
5. `deliver_at_time - publish_time` > `maxDeliveryDelayInMillis`

```java
// In org.apache.pulsar.broker.service.Producer#checkAndStartPublish
if (topic.isPersistent()) {
PersistentTopic pTopic = (PersistentTopic) topic;
if (pTopic.isDelayedDeliveryEnabled()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
if (msgMetadata.hasDeliverAtTime()) {
long maxDeliveryDelayInMillis = pTopic.getMaxDeliveryDelayInMillis();
if (maxDeliveryDelayInMillis > 0
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMillis) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
String.format("Exceeds max allowed delivery delay of %s milliseconds", maxDeliveryDelayInMillis));
cnx.completedSendOperation(false, headersAndPayload.readableBytes());
});
return false;
}
}
}
}
```

### Consumer Impact
The proposal does not involve any client changes, however it is important to note that setting a max delivery delay may impact the `Consumer` since the `Consumer` uses delayed delivery for retrying to the retry/dlq topic (ex. `reconsumeLater` API). So the max `Consumer` retry delay will be the same as the configured `maxDeliveryDelayInMillis` (if enabled).

A problem will occur if max delivery delay is configured but a `Consumer` uses a larger custom retry delay. In this scenario, the `Consumer` will actually get stuck redelivering the message as the publish to the retry topic will fail. For this scenario, a larger retry delay should be configured specifically for the Consumer's retry topic (or no delay limit should be used for retry topics).

A more elegant solution would require a protocol change (see `Alternatives` section below).

## Public-facing Changes

### Public API
The optional `maxDeliveryDelayInMillis` field will be added to the admin REST APIs for configuring topic/namespace policies:
- `POST /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery`
- `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery`

And the corresponding `GET` APIs will show `maxDeliveryDelayInMillis` in the response:
- `GET /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery`
- `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery`

### Configuration
Broker will have a new config in `broker.conf`:
```
# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this max delay, then
# it will return an error to the producer.
# The default value is 0 which means there is no limit on the max delivery delay.
delayedDeliveryMaxDeliveryDelayInMillis=0
```

### CLI
Both `CmdTopics` and `CmdNamespaces` will be updated to include this additional optional configuration.

# Backward & Forward Compatibility

## Revert
Reverting to a previous version will simply get rid of this config/limitation which is the previous behavior.

## Upgrade
We will default the value to 0/disabled (no limitation), so this is a backwards compatible change and will not cause any functional change when upgrading to this feature/version. This feature will only be applied once the config is changed.

If configured, the `maxDeliveryDelayInMillis` limitation will affect:
1. Producers who configure a longer max delivery delay (PIP-26: 2.4.0+)
2. Consumers who configure a longer retry delay when using retry topic (PIP-58: 2.6.0+)

# Alternatives
## Add delayed delivery limit check at client-side
An alternative is to add the limit check to the client-side which requires a protocol change so that client `Producer`/`Consumer` will receive the delayed delivery configurations from the broker. The client `Producer` can then throw an exception if the caller provides a delay greater than the configured limit. The client `Consumer` can more elegantly handle when the retry publish delay is greater than the configured limit as it can default to using the limit instead of being stuck waiting for the limit to be increased.

This would still require the broker-side check as someone may be using a custom client. The main benefit is being able to elegantly handle the `Consumer` retry topic scenario.

If we were to make this protocol change, then it might make sense to also have the `Producer` check the `delayedDeliveryEnabled` config. If delayed delivery is disabled and the `Producer` tries to send a delayed message, then an exception is thrown to the caller (current behavior is the broker will just deliver the message instantly and no error is provided to the `Producer` so it can be misleading).

We would also need to add the client-side checks to other supported client libraries.

Since the scope of this alternative would be quite expansive, we may want to pursue this in a follow-up PIP instead of trying to address it all at once.

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/285nm08842or324rxc2zy83wxgqxtcjp
* Mailing List voting thread: https://lists.apache.org/thread/gkqrfrxx74j0dmrogg3now29v1of9zm9
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API.</description>
<reflections.version>0.10.2</reflections.version>
<swagger.version>1.6.2</swagger.version>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<docker-maven.version>0.42.1</docker-maven.version>
<docker-maven.version>0.43.3</docker-maven.version>
<docker.verbose>true</docker.verbose>
<typetools.version>0.5.0</typetools.version>
<protobuf3.version>3.19.6</protobuf3.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
Expand All @@ -37,7 +36,6 @@
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException;

/**
Expand All @@ -59,20 +57,6 @@ default CompletableFuture<Boolean> isSuperUser(String role,
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
}

/**
* @deprecated - Use method {@link #isSuperUser(String, AuthenticationDataSource, ServiceConfiguration)}.
* Will be removed after 2.12.
* Check if specified role is a super user
* @param role the role to check
* @return a CompletableFuture containing a boolean in which true means the role is a super user
* and false if it is not
*/
@Deprecated
default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
}

/**
* Check if specified role is an admin of the tenant.
* @param tenant the tenant to check
Expand Down Expand Up @@ -270,21 +254,6 @@ default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
operation.toString(), tenantName)));
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
@Deprecated
default Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(tenantName, role, operation, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
}
}

/**
* Check if a given <tt>role</tt> is allowed to execute a given <tt>operation</tt> on the namespace.
*
Expand All @@ -303,23 +272,6 @@ default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName na
+ "the Authorization provider you are using."));
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
@Deprecated
default Boolean allowNamespaceOperation(NamespaceName namespaceName,
String role,
NamespaceOperation operation,
AuthenticationDataSource authData) {
try {
return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
}
}

/**
* Check if a given <tt>role</tt> is allowed to execute a given policy <tt>operation</tt> on the namespace.
*
Expand All @@ -340,24 +292,6 @@ default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceN
+ "is not supported by is not supported by the Authorization provider you are using."));
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
@Deprecated
default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
PolicyName policy,
PolicyOperation operation,
String role,
AuthenticationDataSource authData) {
try {
return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
}
}

/**
* Check if a given <tt>role</tt> is allowed to execute a given topic <tt>operation</tt> on the topic.
*
Expand All @@ -376,23 +310,6 @@ default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+ "provider you are using."));
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
@Deprecated
default Boolean allowTopicOperation(TopicName topicName,
String role,
TopicOperation operation,
AuthenticationDataSource authData) {
try {
return allowTopicOperationAsync(topicName, role, operation, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
}
}

/**
* Check if a given <tt>role</tt> is allowed to execute a given topic <tt>operation</tt> on topic's <tt>policy</tt>.
*
Expand All @@ -412,24 +329,6 @@ default CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topi
+ "is not supported by the Authorization provider you are using."));
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
@Deprecated
default Boolean allowTopicPolicyOperation(TopicName topicName,
String role,
PolicyName policy,
PolicyOperation operation,
AuthenticationDataSource authData) {
try {
return allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
}
}

/**
* Remove authorization-action permissions on a topic.
* @param topicName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,9 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenApply(__ -> {
checkNotNull(clusterIds, "ClusterIds should not be null");
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
if (!namespaceName.isGlobal() && !(clusterIds.size() == 1
&& clusterIds.get(0).equals(pulsar().getConfiguration().getClusterName()))) {
throw new RestException(Status.PRECONDITION_FAILED,
Expand Down
Loading

0 comments on commit a3a381f

Please sign in to comment.