diff --git a/pip/pip-315.md b/pip/pip-315.md
new file mode 100644
index 0000000000000..fd6cb3ef25727
--- /dev/null
+++ b/pip/pip-315.md
@@ -0,0 +1,137 @@
+# PIP-315: Configurable max delay limit for delayed delivery
+
+# Background knowledge
+Delayed message delivery is an important feature which allows a producer to specify that a message should be delivered/consumed at a later time. Currently the broker will save a delayed message without any check. The message's `deliverAt` time is checked when the broker dispatches messages to the Consumer. If a message has a `deliverAt` time, then it is added to the `DelayedDeliveryTracker` and will be delivered later when eligible.
+
+Delayed message delivery is only available for persistent topics, and shared/key-shared subscription types.
+
+# Motivation
+Currently there is no max delay limit so a producer can specify any delay when publishing a message.
+
+This poses a few challenges:
+1. Producer may miscalculate/misconfigure a very large delay (ex. 1,000 day instead of 100 day delay)
+2. Pulsar administrators may want to limit the max allowed delay since unacked messages (ex. messages with a large delay) will be stored forever (unless TTL is configured)
+3. The configured delay may be greater than the configured TTL which means the delayed message may be deleted before the `deliverAt` time (before the consumer can process it)
+
+# Goals
+The purpose of this PIP is to introduce an optional configuration to limit the max allowed delay for delayed delivery.
+
+## In Scope
+- Add broker configuration to limit the max allowed delay for delayed delivery
+- Configurable at broker/topic/namespace-level
+
+# High Level Design
+We will add a configuration `maxDeliveryDelayInMillis` and if configured, the broker will check incoming delayed messages to see if the message's `deliverAt` time exceeds the configured limit. If it exceeds the limit, the broker will send an error back to the Producer.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Broker Changes
+A new `maxDeliveryDelayInMillis` config will be added to the broker which is initially defaulted to 0 (disabled). The default (disabled) behavior will match the current delayed delivery behavior (no limit on delivery delay).
+```
+# broker.conf
+delayedDeliveryMaxDeliveryDelayInMillis=0
+```
+
+This field will also be added to the existing `DelayedDeliveryPolicies` interface to support topic & namespace-level configuration:
+```java
+public interface DelayedDeliveryPolicies {
+ long getMaxDeliveryDelayInMillis();
+}
+```
+
+The max delivery delay check will occur in the broker's `Producer` class inside of `checkAndStartPublish` (same place as other checks such as `isEncryptionEnabled`).
+
+We will give a `ServerError.NotAllowedError` error if all of the following are true:
+1. Sending to a persistent topic
+2. Topic has `delayedDeliveryEnabled=true`
+3. `MessageMetadata` `deliver_at_time` has been specified
+4. Topic has `>0` value for `maxDeliveryDelayInMillis`
+5. `deliver_at_time - publish_time` > `maxDeliveryDelayInMillis`
+
+```java
+// In org.apache.pulsar.broker.service.Producer#checkAndStartPublish
+if (topic.isPersistent()) {
+ PersistentTopic pTopic = (PersistentTopic) topic;
+ if (pTopic.isDelayedDeliveryEnabled()) {
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (msgMetadata.hasDeliverAtTime()) {
+ long maxDeliveryDelayInMillis = pTopic.getMaxDeliveryDelayInMillis();
+ if (maxDeliveryDelayInMillis > 0
+ && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMillis) {
+ cnx.execute(() -> {
+ cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
+ String.format("Exceeds max allowed delivery delay of %s milliseconds", maxDeliveryDelayInMillis));
+ cnx.completedSendOperation(false, headersAndPayload.readableBytes());
+ });
+ return false;
+ }
+ }
+ }
+}
+```
+
+### Consumer Impact
+The proposal does not involve any client changes, however it is important to note that setting a max delivery delay may impact the `Consumer` since the `Consumer` uses delayed delivery for retrying to the retry/dlq topic (ex. `reconsumeLater` API). So the max `Consumer` retry delay will be the same as the configured `maxDeliveryDelayInMillis` (if enabled).
+
+A problem will occur if max delivery delay is configured but a `Consumer` uses a larger custom retry delay. In this scenario, the `Consumer` will actually get stuck redelivering the message as the publish to the retry topic will fail. For this scenario, a larger retry delay should be configured specifically for the Consumer's retry topic (or no delay limit should be used for retry topics).
+
+A more elegant solution would require a protocol change (see `Alternatives` section below).
+
+## Public-facing Changes
+
+### Public API
+The optional `maxDeliveryDelayInMillis` field will be added to the admin REST APIs for configuring topic/namespace policies:
+- `POST /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery`
+- `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery`
+
+And the corresponding `GET` APIs will show `maxDeliveryDelayInMillis` in the response:
+- `GET /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery`
+- `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery`
+
+### Configuration
+Broker will have a new config in `broker.conf`:
+```
+# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this max delay, then
+# it will return an error to the producer.
+# The default value is 0 which means there is no limit on the max delivery delay.
+delayedDeliveryMaxDeliveryDelayInMillis=0
+```
+
+### CLI
+Both `CmdTopics` and `CmdNamespaces` will be updated to include this additional optional configuration.
+
+# Backward & Forward Compatibility
+
+## Revert
+Reverting to a previous version will simply get rid of this config/limitation which is the previous behavior.
+
+## Upgrade
+We will default the value to 0/disabled (no limitation), so this is a backwards compatible change and will not cause any functional change when upgrading to this feature/version. This feature will only be applied once the config is changed.
+
+If configured, the `maxDeliveryDelayInMillis` limitation will affect:
+1. Producers who configure a longer max delivery delay (PIP-26: 2.4.0+)
+2. Consumers who configure a longer retry delay when using retry topic (PIP-58: 2.6.0+)
+
+# Alternatives
+## Add delayed delivery limit check at client-side
+An alternative is to add the limit check to the client-side which requires a protocol change so that client `Producer`/`Consumer` will receive the delayed delivery configurations from the broker. The client `Producer` can then throw an exception if the caller provides a delay greater than the configured limit. The client `Consumer` can more elegantly handle when the retry publish delay is greater than the configured limit as it can default to using the limit instead of being stuck waiting for the limit to be increased.
+
+This would still require the broker-side check as someone may be using a custom client. The main benefit is being able to elegantly handle the `Consumer` retry topic scenario.
+
+If we were to make this protocol change, then it might make sense to also have the `Producer` check the `delayedDeliveryEnabled` config. If delayed delivery is disabled and the `Producer` tries to send a delayed message, then an exception is thrown to the caller (current behavior is the broker will just deliver the message instantly and no error is provided to the `Producer` so it can be misleading).
+
+We would also need to add the client-side checks to other supported client libraries.
+
+Since the scope of this alternative would be quite expansive, we may want to pursue this in a follow-up PIP instead of trying to address it all at once.
+
+# Links
+
+
+* Mailing List discussion thread: https://lists.apache.org/thread/285nm08842or324rxc2zy83wxgqxtcjp
+* Mailing List voting thread: https://lists.apache.org/thread/gkqrfrxx74j0dmrogg3now29v1of9zm9
diff --git a/pom.xml b/pom.xml
index ff12d940dfe00..2ce1407ecfd81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API.
0.10.2
1.6.2
8.37
- 0.42.1
+ 0.43.3
true
0.5.0
3.19.6
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 24b94efb4882f..7d25580ff92bb 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
@@ -37,7 +36,6 @@
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
/**
@@ -59,20 +57,6 @@ default CompletableFuture isSuperUser(String role,
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
}
- /**
- * @deprecated - Use method {@link #isSuperUser(String, AuthenticationDataSource, ServiceConfiguration)}.
- * Will be removed after 2.12.
- * Check if specified role is a super user
- * @param role the role to check
- * @return a CompletableFuture containing a boolean in which true means the role is a super user
- * and false if it is not
- */
- @Deprecated
- default CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
- Set superUserRoles = serviceConfiguration.getSuperUserRoles();
- return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
- }
-
/**
* Check if specified role is an admin of the tenant.
* @param tenant the tenant to check
@@ -270,21 +254,6 @@ default CompletableFuture allowTenantOperationAsync(String tenantName,
operation.toString(), tenantName)));
}
- /**
- * @deprecated - will be removed after 2.12. Use async variant.
- */
- @Deprecated
- default Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
- AuthenticationDataSource authData) {
- try {
- return allowTenantOperationAsync(tenantName, role, operation, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e.getCause());
- }
- }
-
/**
* Check if a given role is allowed to execute a given operation on the namespace.
*
@@ -303,23 +272,6 @@ default CompletableFuture allowNamespaceOperationAsync(NamespaceName na
+ "the Authorization provider you are using."));
}
- /**
- * @deprecated - will be removed after 2.12. Use async variant.
- */
- @Deprecated
- default Boolean allowNamespaceOperation(NamespaceName namespaceName,
- String role,
- NamespaceOperation operation,
- AuthenticationDataSource authData) {
- try {
- return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e.getCause());
- }
- }
-
/**
* Check if a given role is allowed to execute a given policy operation on the namespace.
*
@@ -340,24 +292,6 @@ default CompletableFuture allowNamespacePolicyOperationAsync(NamespaceN
+ "is not supported by is not supported by the Authorization provider you are using."));
}
- /**
- * @deprecated - will be removed after 2.12. Use async variant.
- */
- @Deprecated
- default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
- PolicyName policy,
- PolicyOperation operation,
- String role,
- AuthenticationDataSource authData) {
- try {
- return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e.getCause());
- }
- }
-
/**
* Check if a given role is allowed to execute a given topic operation on the topic.
*
@@ -376,23 +310,6 @@ default CompletableFuture allowTopicOperationAsync(TopicName topic,
+ "provider you are using."));
}
- /**
- * @deprecated - will be removed after 2.12. Use async variant.
- */
- @Deprecated
- default Boolean allowTopicOperation(TopicName topicName,
- String role,
- TopicOperation operation,
- AuthenticationDataSource authData) {
- try {
- return allowTopicOperationAsync(topicName, role, operation, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e.getCause());
- }
- }
-
/**
* Check if a given role is allowed to execute a given topic operation on topic's policy.
*
@@ -412,24 +329,6 @@ default CompletableFuture allowTopicPolicyOperationAsync(TopicName topi
+ "is not supported by the Authorization provider you are using."));
}
- /**
- * @deprecated - will be removed after 2.12. Use async variant.
- */
- @Deprecated
- default Boolean allowTopicPolicyOperation(TopicName topicName,
- String role,
- PolicyName policy,
- PolicyOperation operation,
- AuthenticationDataSource authData) {
- try {
- return allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e.getCause());
- }
- }
-
/**
* Remove authorization-action permissions on a topic.
* @param topicName
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7be1ebf3dd159..c5174991298d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -668,7 +668,9 @@ protected CompletableFuture internalSetNamespaceReplicationClusters(List validatePoliciesReadOnlyAccessAsync())
.thenApply(__ -> {
- checkNotNull(clusterIds, "ClusterIds should not be null");
+ if (CollectionUtils.isEmpty(clusterIds)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
+ }
if (!namespaceName.isGlobal() && !(clusterIds.size() == 1
&& clusterIds.get(0).equals(pulsar().getConfiguration().getClusterName()))) {
throw new RestException(Status.PRECONDITION_FAILED,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0ef487c320dde..9d518ed952227 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -80,6 +80,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -1254,9 +1255,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
}
protected CompletableFuture extends TopicStats> internalGetStatsAsync(boolean authoritative,
- boolean getPreciseBacklog,
- boolean subscriptionBacklogSize,
- boolean getEarliestTimeInBacklog) {
+ GetStatsOptions getStatsOptions) {
CompletableFuture future;
if (topicName.isGlobal()) {
@@ -1268,8 +1267,7 @@ protected CompletableFuture extends TopicStats> internalGetStatsAsync(boolean
return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize,
- getEarliestTimeInBacklog));
+ .thenCompose(topic -> topic.asyncGetStats(getStatsOptions));
}
protected CompletableFuture internalGetInternalStatsAsync(boolean authoritative,
@@ -1402,8 +1400,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
- boolean getPreciseBacklog, boolean subscriptionBacklogSize,
- boolean getEarliestTimeInBacklog) {
+ GetStatsOptions getStatsOptions) {
CompletableFuture future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -1419,6 +1416,14 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
}
PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
List> topicStatsFutureList = new ArrayList<>(partitionMetadata.partitions);
+ org.apache.pulsar.client.admin.GetStatsOptions statsOptions =
+ new org.apache.pulsar.client.admin.GetStatsOptions(
+ getStatsOptions.isGetPreciseBacklog(),
+ getStatsOptions.isSubscriptionBacklogSize(),
+ getStatsOptions.isGetEarliestTimeInBacklog(),
+ getStatsOptions.isExcludePublishers(),
+ getStatsOptions.isExcludeConsumers()
+ );
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
topicStatsFutureList.add(
@@ -1428,13 +1433,11 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
if (owned) {
return getTopicReferenceAsync(partition)
.thenApply(ref ->
- ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
- getEarliestTimeInBacklog));
+ ref.getStats(getStatsOptions));
} else {
try {
return pulsar().getAdminClient().topics().getStatsAsync(
- partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
- getEarliestTimeInBacklog);
+ partition.toString(), statsOptions);
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
@@ -3370,6 +3373,9 @@ protected CompletableFuture internalSetReplicationClusters(List cl
return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
+ if (CollectionUtils.isEmpty(clusterIds)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
+ }
Set replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 1992ea7e47799..454b8f0fac82c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -147,8 +147,13 @@ public CompletableFuture> testCompati
.thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
.thenCompose(strategy -> {
String schemaId = getSchemaId();
+ final SchemaType schemaType = SchemaType.valueOf(payload.getType());
+ byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
+ if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) {
+ data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data);
+ }
return pulsar().getSchemaRegistryService().isCompatible(schemaId,
- SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
+ SchemaData.builder().data(data)
.isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index d9b7430072bd2..43224248fdca0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,6 +44,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
@@ -444,7 +445,9 @@ public void getStats(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalGetStatsAsync(authoritative, getPreciseBacklog, false, false)
+ GetStatsOptions getStatsOptions =
+ new GetStatsOptions(getPreciseBacklog, false, false, false, false);
+ internalGetStatsAsync(authoritative, getStatsOptions)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
@@ -511,7 +514,8 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false, false);
+ GetStatsOptions getStatsOptions = new GetStatsOptions(false, false, false, false, false);
+ internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 386b9749ef959..d4795393f9b03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -214,7 +214,11 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
- @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers")
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) {
try {
validateTopicName(tenant, namespace, encodedTopic);
if (topicName.isPartitioned()) {
@@ -240,12 +244,19 @@ public void getPartitionedStats(
NonPersistentPartitionedTopicStatsImpl stats =
new NonPersistentPartitionedTopicStatsImpl(partitionMetadata);
List> topicStatsFutureList = new ArrayList<>();
+ org.apache.pulsar.client.admin.GetStatsOptions statsOptions =
+ new org.apache.pulsar.client.admin.GetStatsOptions(
+ getPreciseBacklog,
+ subscriptionBacklogSize,
+ getEarliestTimeInBacklog,
+ excludePublishers,
+ excludeConsumers
+ );
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
- (topicName.getPartition(i).toString()), getPreciseBacklog,
- subscriptionBacklogSize, getEarliestTimeInBacklog));
+ (topicName.getPartition(i).toString()), statsOptions));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9c91e8370da67..9ccbc0ecba171 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -49,6 +49,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -1195,9 +1196,16 @@ public void getStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
- @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers")
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
- internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)
+ GetStatsOptions getStatsOptions =
+ new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog,
+ excludePublishers, excludeConsumers);
+ internalGetStatsAsync(authoritative, getStatsOptions)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
@@ -1297,15 +1305,20 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
- @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers")
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) {
try {
validateTopicName(tenant, namespace, encodedTopic);
if (topicName.isPartitioned()) {
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Partitioned Topic Name should not contain '-partition-'");
}
- internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
- subscriptionBacklogSize, getEarliestTimeInBacklog);
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog, excludePublishers, excludeConsumers);
+ internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index e72c805d73879..ee4fcff3ad1aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -115,6 +115,9 @@ public class Consumer {
private final ConsumerStatsImpl stats;
private final boolean isDurable;
+
+ private final boolean isPersistentTopic;
+
private static final AtomicIntegerFieldUpdater UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
@@ -172,6 +175,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.readCompacted = readCompacted;
this.consumerName = consumerName;
this.isDurable = isDurable;
+ this.isPersistentTopic = subscription.getTopic() instanceof PersistentTopic;
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
this.msgOut = new Rate();
@@ -239,6 +243,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.pendingAcks = null;
this.stats = null;
this.isDurable = false;
+ this.isPersistentTopic = false;
this.metadata = null;
this.keySharedMeta = null;
this.clientAddress = null;
@@ -1088,7 +1093,7 @@ public Subscription getSubscription() {
private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
int unackedMsgs = 0;
- if (Subscription.isIndividualAckMode(subType)) {
+ if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) {
subscription.addUnAckedMessages(ackedMessages);
unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GetStatsOptions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GetStatsOptions.java
new file mode 100644
index 0000000000000..ec239a6c14172
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GetStatsOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+@AllArgsConstructor
+public class GetStatsOptions {
+ /**
+ * Set to true to get precise backlog, Otherwise get imprecise backlog.
+ */
+ private final boolean getPreciseBacklog;
+
+ /**
+ * Whether to get backlog size for each subscription.
+ */
+ private final boolean subscriptionBacklogSize;
+
+ /**
+ * Whether to get the earliest time in backlog.
+ */
+ private final boolean getEarliestTimeInBacklog;
+
+ /**
+ * Whether to exclude publishers.
+ */
+ private final boolean excludePublishers;
+
+ /**
+ * Whether to exclude consumers.
+ */
+ private final boolean excludeConsumers;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 6e2eb75a79512..244f982e59b58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -276,10 +276,14 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
+ TopicStatsImpl getStats(GetStatsOptions getStatsOptions);
+
CompletableFuture extends TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
+ CompletableFuture extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);
+
CompletableFuture getInternalStats(boolean includeLedgerMetadata);
Position getLastPosition();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 6e9e5259027d7..6ec969c927a8c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -38,6 +38,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -437,7 +438,7 @@ public boolean expireMessages(Position position) {
+ " non-persistent topic.");
}
- public NonPersistentSubscriptionStatsImpl getStats() {
+ public NonPersistentSubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
NonPersistentSubscriptionStatsImpl subStats = new NonPersistentSubscriptionStatsImpl();
subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
@@ -446,7 +447,9 @@ public NonPersistentSubscriptionStatsImpl getStats() {
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStatsImpl consumerStats = consumer.getStats();
- subStats.consumers.add(consumerStats);
+ if (!getStatsOptions.isExcludeConsumers()) {
+ subStats.consumers.add(consumerStats);
+ }
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.messageAckRate += consumerStats.messageAckRate;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0d80de3aa6de3..6589e7e1ec79c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -56,6 +56,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
@@ -87,6 +88,7 @@
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
@@ -884,10 +886,27 @@ public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean s
}
}
+ @Override
+ public TopicStatsImpl getStats(GetStatsOptions getStatsOptions) {
+ try {
+ return asyncGetStats(getStatsOptions).get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("[{}] Fail to get stats", topic, e);
+ return null;
+ }
+ }
+
@Override
public CompletableFuture asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog, false, false);
+ return (CompletableFuture) asyncGetStats(getStatsOptions);
+ }
+
+ @Override
+ public CompletableFuture extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions) {
CompletableFuture future = new CompletableFuture<>();
NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl();
@@ -900,7 +919,7 @@ public CompletableFuture asyncGetStats(boolean getP
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
- } else {
+ } else if (!getStatsOptions.isExcludePublishers()) {
stats.addPublisher(publisherStats);
}
});
@@ -913,7 +932,7 @@ public CompletableFuture asyncGetStats(boolean getP
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
subscriptions.forEach((name, subscription) -> {
- NonPersistentSubscriptionStatsImpl subStats = subscription.getStats();
+ NonPersistentSubscriptionStatsImpl subStats = subscription.getStats(getStatsOptions);
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
@@ -1165,7 +1184,8 @@ public CompletableFuture unsubscribe(String subscriptionName) {
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
if (sub != null) {
// preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats();
+ GetStatsOptions getStatsOptions = new GetStatsOptions(false, false, false, false, false);
+ SubscriptionStatsImpl stats = sub.getStats(getStatsOptions);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 88130c3c2010c..0397eef8aa86c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -68,6 +68,7 @@
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -1099,8 +1100,7 @@ public long estimateBacklogSize() {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}
- public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize,
- boolean getEarliestTimeInBacklog) {
+ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
@@ -1114,7 +1114,9 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null;
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStatsImpl consumerStats = consumer.getStats();
- subStats.consumers.add(consumerStats);
+ if (!getStatsOptions.isExcludeConsumers()) {
+ subStats.consumers.add(consumerStats);
+ }
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
@@ -1164,14 +1166,14 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.msgDelayed = d.getNumberOfDelayedMessages();
}
}
- subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
- if (subscriptionBacklogSize) {
+ subStats.msgBacklog = getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog());
+ if (getStatsOptions.isSubscriptionBacklogSize()) {
subStats.backlogSize = ((ManagedLedgerImpl) topic.getManagedLedger())
.getEstimatedBacklogSize((PositionImpl) cursor.getMarkDeletedPosition());
} else {
subStats.backlogSize = -1;
}
- if (getEarliestTimeInBacklog) {
+ if (getStatsOptions.isGetEarliestTimeInBacklog()) {
if (subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eaa57140c9f19..1e57debef0c33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -111,6 +111,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
@@ -1254,7 +1255,7 @@ void removeSubscription(String subscriptionName) {
PersistentSubscription sub = subscriptions.remove(subscriptionName);
if (sub != null) {
// preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats(false, false, false);
+ SubscriptionStatsImpl stats = sub.getStats(new GetStatsOptions(false, false, false, false, false));
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}
@@ -2264,9 +2265,26 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
}
}
+ @Override
+ public TopicStatsImpl getStats(GetStatsOptions getStatsOptions) {
+ try {
+ return asyncGetStats(getStatsOptions).get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("[{}] Fail to get stats", topic, e);
+ return null;
+ }
+ }
+
@Override
public CompletableFuture asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog, false, false);
+ return (CompletableFuture) asyncGetStats(getStatsOptions);
+ }
+
+ @Override
+ public CompletableFuture extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions) {
CompletableFuture statsFuture = new CompletableFuture<>();
TopicStatsImpl stats = new TopicStatsImpl();
@@ -2281,7 +2299,9 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
- stats.addPublisher(publisherStats);
+ if (!getStatsOptions.isExcludePublishers()){
+ stats.addPublisher(publisherStats);
+ }
});
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
@@ -2298,8 +2318,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
subscriptions.forEach((name, subscription) -> {
- SubscriptionStatsImpl subStats =
- subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
+ SubscriptionStatsImpl subStats = subscription.getStats(getStatsOptions);
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
@@ -2359,7 +2378,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog
return compactionRecord;
});
- if (getEarliestTimeInBacklog && stats.backlogSize != 0) {
+ if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize != 0) {
ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) -> {
if (e != null) {
log.error("[{}] Failed to get earliest message publish time in backlog", topic, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cb39cc93154fb..5fa64e9f067cc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -253,6 +253,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map
}
if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
+ m.close();
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId);
return;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 2a49c14e35583..9a5d25fa0c867 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1435,19 +1435,10 @@ public void testClusterIsReadyBeforeCreateTopic() throws Exception {
admin.namespaces().createNamespace(defaultTenant + "/ns2");
// By default the cluster will configure as configuration file. So the create topic operation
// will never throw exception except there is no cluster.
- admin.namespaces().setNamespaceReplicationClusters(defaultTenant + "/ns2", new HashSet());
+ admin.namespaces().setNamespaceReplicationClusters(defaultTenant + "/ns2", Sets.newHashSet(configClusterName));
- try {
- admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions);
- Assert.fail("should have failed due to Namespace does not have any clusters configured");
- } catch (PulsarAdminException.PreconditionFailedException ignored) {
- }
-
- try {
- admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions);
- Assert.fail("should have failed due to Namespace does not have any clusters configured");
- } catch (PulsarAdminException.PreconditionFailedException ignored) {
- }
+ admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions);
+ admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions);
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 6c854daab6d4d..0df378356703c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -77,6 +77,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.testcontext.SpyConfig;
+import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1045,6 +1046,89 @@ public void received(Consumer consumer, Message msg) {
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
}
+ @Test(dataProvider = "topicType")
+ public void testPartitionState(String topicType) throws Exception {
+ final String namespace = "prop-xyz/ns1";
+ final String partitionedTopicName = topicType + "://" + namespace + "/ds1";
+
+ admin.topics().createPartitionedTopic(partitionedTopicName, 4);
+
+ // create consumer and subscription
+ URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Consumer consumer = client.newConsumer().topic(partitionedTopicName)
+ .subscriptionName("my-sub").subscribe();
+
+ Producer producer = client.newProducer(Schema.BYTES)
+ .topic(partitionedTopicName)
+ .enableBatching(false)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ String message = "message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ GetStatsOptions getStatsOptions = new GetStatsOptions(false, false, false, true, true);
+ PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName,
+ true, getStatsOptions);
+ assertEquals(topicStats.getPublishers().size(), 0);
+ topicStats.getPartitions().forEach((k, v)-> {
+ assertEquals(v.getPublishers().size(), 0);
+ v.getSubscriptions().forEach((k1, v1)-> {
+ assertEquals(v1.getConsumers().size(), 0);
+ });
+ });
+
+ topicStats.getSubscriptions().forEach((k, v)-> {
+ assertEquals(v.getConsumers().size(), 0);
+ });
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
+
+ @Test(dataProvider = "topicType")
+ public void testNonPartitionState(String topicType) throws Exception {
+ final String namespace = "prop-xyz/ns1";
+ final String topicName = topicType + "://" + namespace + "/ds1";
+
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // create consumer and subscription
+ URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Consumer consumer = client.newConsumer().topic(topicName)
+ .subscriptionName("my-sub").subscribe();
+
+ Producer producer = client.newProducer(Schema.BYTES)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ String message = "message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ GetStatsOptions getStatsOptions = new GetStatsOptions(false, false, false, true, true);
+ TopicStats topicStats = admin.topics().getStats(topicName, getStatsOptions);
+
+ assertEquals(topicStats.getPublishers().size(), 0);
+
+ topicStats.getSubscriptions().forEach((k, v)-> {
+ assertEquals(v.getConsumers().size(), 0);
+ });
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
@Test(dataProvider = "topicNamesForAllTypes")
public void partitionedTopics(String topicType, String topicName) throws Exception {
final String namespace = "prop-xyz/ns1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index df77c42eee73e..f2eb8e74c366d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -32,6 +32,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
@@ -1294,6 +1295,14 @@ public void testForceDeleteNamespace() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
}
+ @Test
+ public void testSetNamespaceReplicationCluters() throws Exception {
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");
+ admin.namespaces().createNamespace(namespace, 100);
+ assertThrows(PulsarAdminException.PreconditionFailedException.class,
+ () -> admin.namespaces().setNamespaceReplicationClusters(namespace, Set.of()));
+ }
+
@Test
public void testForceDeleteNamespaceNotAllowed() throws Exception {
assertFalse(pulsar.getConfiguration().isForceDeleteNamespaceAllowed());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 01e76aeb6f6d3..f66761ff95aa5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -315,7 +315,7 @@ public void testCreateSubscriptions() throws Exception {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
- persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
+ persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false);
ArgumentCaptor statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
TopicStats topicStats = statCaptor.getValue();
@@ -333,7 +333,7 @@ public void testCreateSubscriptions() throws Exception {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
- persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
+ persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false);
statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
topicStats = statCaptor.getValue();
@@ -352,7 +352,7 @@ public void testCreateSubscriptions() throws Exception {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
- persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
+ persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false);
statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
topicStats = statCaptor.getValue();
@@ -371,7 +371,7 @@ public void testCreateSubscriptions() throws Exception {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
- persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
+ persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false);
statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
topicStats = statCaptor.getValue();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 4e510a50f1098..25ca3bf1444d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -23,6 +23,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
@@ -2990,6 +2991,10 @@ public void testReplicatorClusterApi() throws Exception {
admin.topics().removeReplicationClusters(topic);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getReplicationClusters(topic, false)));
+
+ assertThrows(PulsarAdminException.PreconditionFailedException.class, () -> admin.topics().setReplicationClusters(topic, List.of()));
+ assertThrows(PulsarAdminException.PreconditionFailedException.class, () -> admin.topics().setReplicationClusters(topic, null));
+
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
index 1b2a6322cba3d..de5117c0187a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -48,11 +48,6 @@ public CompletableFuture isSuperUser(String role,
return roleAuthorizedAsync(role);
}
- @Override
- public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
- return roleAuthorizedAsync(role);
- }
-
@Override
public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
@@ -128,12 +123,6 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, S
return roleAuthorizedAsync(role);
}
- @Override
- public Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
- AuthenticationDataSource authData) {
- return roleAuthorized(role);
- }
-
@Override
public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName,
String role,
@@ -142,15 +131,6 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam
return roleAuthorizedAsync(role);
}
- @Override
- public Boolean allowNamespaceOperation(NamespaceName namespaceName,
- String role,
- NamespaceOperation operation,
- AuthenticationDataSource authData) {
- return roleAuthorized(role);
- }
-
-
@Override
public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
PolicyName policy,
@@ -160,15 +140,6 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa
return roleAuthorizedAsync(role);
}
- @Override
- public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
- PolicyName policy,
- PolicyOperation operation,
- String role,
- AuthenticationDataSource authData) {
- return roleAuthorized(role);
- }
-
@Override
public CompletableFuture allowTopicOperationAsync(TopicName topic,
String role,
@@ -177,14 +148,6 @@ public CompletableFuture allowTopicOperationAsync(TopicName topic,
return roleAuthorizedAsync(role);
}
- @Override
- public Boolean allowTopicOperation(TopicName topicName,
- String role,
- TopicOperation operation,
- AuthenticationDataSource authData) {
- return roleAuthorized(role);
- }
-
CompletableFuture roleAuthorizedAsync(String role) {
CompletableFuture promise = new CompletableFuture<>();
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a2b42b5cca034..e0a13e103c647 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2231,7 +2231,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception {
sub1.addConsumer(consumer1);
consumer1.close();
- SubscriptionStatsImpl stats1 = sub1.getStats(false, false, false);
+ SubscriptionStatsImpl stats1 = sub1.getStats(new GetStatsOptions(false, false, false, false, false));
assertEquals(stats1.keySharedMode, "AUTO_SPLIT");
assertFalse(stats1.allowOutOfOrderDelivery);
@@ -2242,7 +2242,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception {
sub2.addConsumer(consumer2);
consumer2.close();
- SubscriptionStatsImpl stats2 = sub2.getStats(false, false, false);
+ SubscriptionStatsImpl stats2 = sub2.getStats(new GetStatsOptions(false, false, false, false, false));
assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
assertTrue(stats2.allowOutOfOrderDelivery);
@@ -2254,7 +2254,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception {
sub3.addConsumer(consumer3);
consumer3.close();
- SubscriptionStatsImpl stats3 = sub3.getStats(false, false, false);
+ SubscriptionStatsImpl stats3 = sub3.getStats(new GetStatsOptions(false, false, false, false, false));
assertEquals(stats3.keySharedMode, "STICKY");
assertFalse(stats3.allowOutOfOrderDelivery);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 2bdb24dceebd1..fbf734f331f2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.schema;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
@@ -46,9 +47,11 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -403,8 +406,12 @@ public void testKeyValueSchema() throws Exception {
.build(),
SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
.build(), KeyValueEncodingType.SEPARATED);
+ assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.schemas().testCompatibility(topicName, schemaInfo));
admin.schemas().createSchema(topicName, schemaInfo);
+ final IsCompatibilityResponse isCompatibilityResponse = admin.schemas().testCompatibility(topicName, schemaInfo);
+ Assert.assertTrue(isCompatibilityResponse.isCompatibility());
+
final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName);
Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0);
@@ -413,5 +420,6 @@ public void testKeyValueSchema() throws Exception {
final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo());
Assert.assertEquals(version2, 0);
+
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index bbeee9f5a497a..f29c643a8f50b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -446,4 +446,37 @@ public void testAvgMessagesPerEntry() throws Exception {
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);
}
+
+ @Test()
+ public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exception {
+ final String topicName = "non-persistent://my-property/my-ns/my-topic" + UUID.randomUUID();
+ final String subName = "my-sub";
+
+ @Cleanup
+ Producer producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ for (int i = 0; i < 5; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+ for (int i = 0; i < 5; i++) {
+ Message msg = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledge(msg);
+ }
+ TimeUnit.SECONDS.sleep(1);
+
+ TopicStats topicStats = admin.topics().getStats(topicName);
+ assertEquals(1, topicStats.getSubscriptions().size());
+ List extends ConsumerStats> consumers = topicStats.getSubscriptions().get(subName).getConsumers();
+ assertEquals(1, consumers.size());
+ assertEquals(0, consumers.get(0).getUnackedMessages());
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 9a36e0683b422..769486054ab04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -37,7 +37,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
@@ -64,7 +63,6 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
@@ -896,13 +894,6 @@ public void close() throws IOException {
// No-op
}
- @Override
- public CompletableFuture isSuperUser(String role,
- ServiceConfiguration serviceConfiguration) {
- Set superUserRoles = serviceConfiguration.getSuperUserRoles();
- return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
- }
-
@Override
public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
this.conf = conf;
@@ -977,23 +968,12 @@ public CompletableFuture allowTenantOperationAsync(
return CompletableFuture.completedFuture(true);
}
- @Override
- public Boolean allowTenantOperation(
- String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
- return true;
- }
-
@Override
public CompletableFuture allowNamespaceOperationAsync(
NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(true);
}
- @Override
- public Boolean allowNamespaceOperation(
- NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
- return null;
- }
@Override
public CompletableFuture allowTopicOperationAsync(
@@ -1008,18 +988,6 @@ public CompletableFuture allowTopicOperationAsync(
return isAuthorizedFuture;
}
-
- @Override
- public Boolean allowTopicOperation(
- TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
- try {
- return allowTopicOperationAsync(topicName, role, operation, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e);
- }
- }
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index b9139dabdf021..a3759c5682165 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -30,7 +30,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
@@ -63,7 +62,6 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -332,12 +330,6 @@ public CompletableFuture allowTenantOperationAsync(
return CompletableFuture.completedFuture(true);
}
- @Override
- public Boolean allowTenantOperation(
- String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
- return true;
- }
-
@Override
public CompletableFuture allowNamespaceOperationAsync(
NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
@@ -352,16 +344,6 @@ public CompletableFuture allowNamespaceOperationAsync(
return isAuthorizedFuture;
}
- @Override
- public Boolean allowNamespaceOperation(
- NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
- try {
- return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(e);
- }
- }
-
@Override
public CompletableFuture allowTopicOperationAsync(
TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
@@ -376,16 +358,6 @@ public CompletableFuture allowTopicOperationAsync(
return isAuthorizedFuture;
}
- @Override
- public Boolean allowTopicOperation(
- TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
- try {
- return allowTopicOperationAsync(topicName, role, operation, authData).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(e);
- }
- }
-
@Override
public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic, String role,
PolicyName policy, PolicyOperation operation,
@@ -400,16 +372,6 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic
return isAuthorizedFuture;
}
-
- @Override
- public Boolean allowTopicPolicyOperation(TopicName topicName, String role, PolicyName policy,
- PolicyOperation operation, AuthenticationDataSource authData) {
- try {
- return allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(e);
- }
- }
}
public static class ClientAuthentication implements Authentication {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 5ee12d660e031..d5a1eca51e40d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1967,9 +1967,32 @@ public void testCompactionDuplicate() throws Exception {
// Wait for phase one to complete
Thread.sleep(500);
+ Optional previousTopicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+ Assert.assertTrue(previousTopicRef.isPresent());
+ PersistentTopic previousPersistentTopic = (PersistentTopic) previousTopicRef.get();
+
// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);
+ Awaitility.await().untilAsserted(() -> {
+ LongRunningProcessStatus previousLongRunningProcessStatus = previousPersistentTopic.compactionStatus();
+
+ Optional currentTopicReference = pulsar.getBrokerService().getTopicReference(topic);
+ Assert.assertTrue(currentTopicReference.isPresent());
+ PersistentTopic currentPersistentTopic = (PersistentTopic) currentTopicReference.get();
+ LongRunningProcessStatus currentLongRunningProcessStatus = currentPersistentTopic.compactionStatus();
+
+ if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR
+ && (currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.NOT_RUN
+ || currentLongRunningProcessStatus.status == LongRunningProcessStatus.Status.ERROR)) {
+ // trigger compaction again
+ admin.topics().triggerCompaction(topic);
+ Assert.assertEquals(currentLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
+ } else if (previousLongRunningProcessStatus.status == LongRunningProcessStatus.Status.RUNNING) {
+ Assert.assertEquals(previousLongRunningProcessStatus.status, LongRunningProcessStatus.Status.SUCCESS);
+ }
+ });
+
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals to number of unique key.
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java
index 14e99ac014ba8..6ebc365833b27 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.client.admin;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
+@AllArgsConstructor
public class GetStatsOptions {
/**
* Set to true to get precise backlog, Otherwise get imprecise backlog.
@@ -38,4 +40,14 @@ public class GetStatsOptions {
* Whether to get the earliest time in backlog.
*/
private final boolean getEarliestTimeInBacklog;
+
+ /**
+ * Whether to exclude publishers.
+ */
+ private final boolean excludePublishers;
+
+ /**
+ * Whether to exclude consumers.
+ */
+ private final boolean excludeConsumers;
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 156d67e4e58b3..cace5cda7bd5b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1139,23 +1139,26 @@ default CompletableFuture deleteAsync(String topic, boolean force) {
default TopicStats getStats(String topic, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) throws PulsarAdminException {
GetStatsOptions getStatsOptions =
- new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
+ new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, false, false);
return getStats(topic, getStatsOptions);
}
default TopicStats getStats(String topic, boolean getPreciseBacklog,
boolean subscriptionBacklogSize) throws PulsarAdminException {
- GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, false);
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, false,
+ false, false);
return getStats(topic, getStatsOptions);
}
default TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException {
- GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, false, false);
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, false, false,
+ false, false);
return getStats(topic, getStatsOptions);
}
default TopicStats getStats(String topic) throws PulsarAdminException {
- return getStats(topic, new GetStatsOptions(false, false, false));
+ return getStats(topic, new GetStatsOptions(false, false, false,
+ false, false));
}
/**
@@ -1176,6 +1179,8 @@ default TopicStats getStats(String topic) throws PulsarAdminException {
CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog,
boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog);
+ CompletableFuture getStatsAsync(String topic, GetStatsOptions getStatsOptions);
+
default CompletableFuture getStatsAsync(String topic) {
return getStatsAsync(topic, false, false, false);
}
@@ -1346,6 +1351,9 @@ PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, bo
boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog)
throws PulsarAdminException;
+ PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, GetStatsOptions getStatsOptions)
+ throws PulsarAdminException;
+
default PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException {
return getPartitionedStats(topic, perPartition, false, false, false);
}
@@ -1369,6 +1377,9 @@ CompletableFuture getPartitionedStatsAsync(
String topic, boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
+ CompletableFuture getPartitionedStatsAsync(
+ String topic, boolean perPartition, GetStatsOptions getStatsOptions);
+
default CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition) {
return getPartitionedStatsAsync(topic, perPartition, false, false, false);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index e0c64319ea2d9..9d09d96073d9e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -591,21 +591,27 @@ public CompletableFuture> getSubscriptionsAsync(String topic) {
@Override
public TopicStats getStats(String topic, GetStatsOptions getStatsOptions) throws PulsarAdminException {
- boolean getPreciseBacklog = getStatsOptions.isGetPreciseBacklog();
- boolean subscriptionBacklogSize = getStatsOptions.isSubscriptionBacklogSize();
- boolean getEarliestTimeInBacklog = getStatsOptions.isGetEarliestTimeInBacklog();
- return sync(() -> getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog));
+ return sync(() -> getStatsAsync(topic, getStatsOptions));
}
@Override
public CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog, false, false);
+ return getStatsAsync(topic, getStatsOptions);
+ }
+
+ @Override
+ public CompletableFuture getStatsAsync(String topic, GetStatsOptions getStatsOptions) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "stats")
- .queryParam("getPreciseBacklog", getPreciseBacklog)
- .queryParam("subscriptionBacklogSize", subscriptionBacklogSize)
- .queryParam("getEarliestTimeInBacklog", getEarliestTimeInBacklog);
+ .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
+ .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
+ .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog())
+ .queryParam("excludePublishers", getStatsOptions.isExcludePublishers())
+ .queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers());
final CompletableFuture future = new CompletableFuture<>();
InvocationCallback persistentCB = new InvocationCallback() {
@@ -622,16 +628,16 @@ public void failed(Throwable throwable) {
InvocationCallback nonpersistentCB =
new InvocationCallback() {
- @Override
- public void completed(NonPersistentTopicStats response) {
- future.complete(response);
- }
+ @Override
+ public void completed(NonPersistentTopicStats response) {
+ future.complete(response);
+ }
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- };
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ };
if (topic.startsWith(TopicDomain.non_persistent.value())) {
asyncGetRequest(path, nonpersistentCB);
@@ -685,34 +691,50 @@ public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartit
subscriptionBacklogSize, getEarliestTimeInBacklog));
}
+ @Override
+ public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition,
+ GetStatsOptions getStatsOptions) throws PulsarAdminException {
+ return sync(()-> getPartitionedStatsAsync(topic, perPartition, getStatsOptions));
+ }
+
@Override
public CompletableFuture getPartitionedStatsAsync(String topic,
boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
+ GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog, false, false);
+ return getPartitionedStatsAsync(topic, perPartition, getStatsOptions);
+ }
+
+ @Override
+ public CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition,
+ GetStatsOptions getStatsOptions) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
path = path.queryParam("perPartition", perPartition)
- .queryParam("getPreciseBacklog", getPreciseBacklog)
- .queryParam("subscriptionBacklogSize", subscriptionBacklogSize)
- .queryParam("getEarliestTimeInBacklog", getEarliestTimeInBacklog);
+ .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
+ .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
+ .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog())
+ .queryParam("excludePublishers", getStatsOptions.isExcludePublishers())
+ .queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers());
final CompletableFuture future = new CompletableFuture<>();
InvocationCallback nonpersistentCB =
new InvocationCallback() {
- @Override
- public void completed(NonPersistentPartitionedTopicStats response) {
- if (!perPartition) {
- response.getPartitions().clear();
- }
- future.complete(response);
- }
+ @Override
+ public void completed(NonPersistentPartitionedTopicStats response) {
+ if (!perPartition) {
+ response.getPartitions().clear();
+ }
+ future.complete(response);
+ }
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- };
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ };
InvocationCallback persistentCB = new InvocationCallback() {
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index 755b2c89c8f20..1b5878ff06c19 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -74,6 +74,7 @@ public class KafkaSinkConfig implements Serializable {
@FieldDoc(
defaultValue = "",
+ sensitive = true,
help = "The password for the trust store file.")
private String sslTruststorePassword;
@@ -126,4 +127,4 @@ public static KafkaSinkConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), KafkaSinkConfig.class);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index 5de60d2a028c8..7065458649c83 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -75,6 +75,7 @@ public class KafkaSourceConfig implements Serializable {
@FieldDoc(
defaultValue = "",
+ sensitive = true,
help = "The password for the trust store file.")
private String sslTruststorePassword;
@@ -156,4 +157,4 @@ public static KafkaSourceConfig load(Map map) throws IOException
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
return mapper.readValue(mapper.writeValueAsString(map), KafkaSourceConfig.class);
}
-}
\ No newline at end of file
+}