diff --git a/docs/src/main/asciidoc/pubsub.adoc b/docs/src/main/asciidoc/pubsub.adoc index 053506bf42..e3505e14fe 100644 --- a/docs/src/main/asciidoc/pubsub.adoc +++ b/docs/src/main/asciidoc/pubsub.adoc @@ -426,6 +426,26 @@ public Subscription newSubscription() { Dead letter topics are no different from any other topic, though some https://cloud.google.com/pubsub/docs/dead-letter-topics#granting_forwarding_permissions[additional permissions] are necessary to ensure the Cloud Pub/Sub service can successfully `ack` the original message and re-`publish` on the dead letter topic. +===== Expiration Policy + +By default, subscriptions expire after 31 days without any subscriber activity or subscription property changes. +A `null` or unset `ExpirationPolicy` will use the default. + +To create a subscription that never expires, provide an `ExpirationPolicy` without setting the TTL. + +See +https://cloud.google.com/pubsub/docs/subscription-properties#expiration_period[Expiration Period] for more information. + +[source,java,indent=0] +---- +public Subscription newSubscription() { + return pubSubAdmin.createSubscription(Subscription.newBuilder() + .setName(SUBSCRIPTION_NAME) + .setTopic(TOPIC_NAME) + .setExpirationPolicy(ExpirationPolicy.newBuilder().build())); // Never expire +} +---- + ==== JSON support For serialization and deserialization of POJOs using Jackson JSON, configure a `PubSubMessageConverter` bean, and the Spring Boot starter for Spring Framework on Google Cloud Pub/Sub will automatically wire it into the `PubSubTemplate`. diff --git a/docs/src/main/asciidoc/spring-stream.adoc b/docs/src/main/asciidoc/spring-stream.adoc index 814d34cf12..060408edb3 100644 --- a/docs/src/main/asciidoc/spring-stream.adoc +++ b/docs/src/main/asciidoc/spring-stream.adoc @@ -43,12 +43,46 @@ If you are using Pub/Sub autoconfiguration from the Spring Framework on Google C NOTE: To use this binder with a https://cloud.google.com/pubsub/docs/emulator[running emulator], configure its host and port via `spring.cloud.gcp.pubsub.emulator-host`. -==== Producer Synchronous Sending Configuration +==== Producer/Consumer Shared Configuration +These properties can be applied to both producers and consumers. +|=== +| Name | Description | Required | Default value +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME/PRODUCER_NAME}.[consumer/producer].allowedHeaders` | Filters incoming and outgoing messages to contain only the provided comma-delimited headers | No | +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME/PRODUCER_NAME}.[consumer/producer].auto-create-resources` | When enabled, topic/subscription will be created if they do not exist | No | true +|=== + +===== Header Mapping +You can filter incoming and outgoing message headers with `allowHeaders` property. +For example, for a consumer to allow only two headers, provide a comma separated list like this: + +.application.properties +---- +spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.allowedHeaders=allowed1,allowed2 +---- +Where `CONSUMER_NAME` should be replaced by the method which is consuming/reading messages from Cloud Pub/Sub and `allowed1,allowed2` is the comma separated list of headers that the user wants to keep. + + + +A similar style is applicable for producers as well. For example: + +.application.properties +---- +spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.allowedHeaders=allowed3,allowed4 +---- +Where `PRODUCER_NAME` should be replaced by the method which is producing/sending messages to Cloud Pub/Sub and `allowed3,allowed4` is the comma separated list of headers that user wants to map. All other headers will be removed before the message is sent to Cloud Pub/Sub. + + +==== Producer-specific Configuration +|=== +| Name | Description | Required | Default value +| `spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.sync` | Enables synchronous sending | No | false +|=== + +===== Producer Synchronous Sending Configuration By default, this binder will send messages to Cloud Pub/Sub asynchronously. If synchronous sending is preferred (for example, to allow propagating errors back to the sender), set `spring.cloud.stream.gcp.pubsub.default.producer.sync` property to `true`. -==== Producer Destination Configuration - +===== Producer Destination Configuration If automatic resource creation is turned ON and the topic corresponding to the destination name does not exist, it will be created. For example, for the following configuration, a topic called `myEvents` would be created. @@ -60,8 +94,19 @@ spring.cloud.stream.bindings.{PRODUCER_NAME}.destination=myEvents spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.auto-create-resources=true ---- -==== Consumer Destination Configuration +==== Consumer-specific Configuration +|=== +| Name | Description | Required | Default value +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.ack-mode` | Controls how messages will be acknowledged when they are successfully received. Options: AUTO, AUTO_ACK, and MANUAL | No | AUTO +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.dead-letter-policy.dead-letter-topic` | Duration of no activity after which a subscription will expire. Use 0d to never expire. | No | 31d +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.dead-letter-policy.max-delivery-attempts` | Duration of no activity after which a subscription will expire. Use 0d to never expire. | No | 31d +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.expiration-policy.ttl` | Duration of no activity after which a subscription will expire. Use 0d to never expire. | No | 31d +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.max-fetch-size` | Limits the number of messages received per poll | No | +| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.subscription-name` | When provided, uses the given subscription name | No | +|=== + +===== Consumer Destination Configuration A `PubSubInboundChannelAdapter` will be configured for your consumer endpoint. You may adjust the ack mode of the consumer endpoint using the `ack-mode` property. The ack mode controls how messages will be acknowledged when they are successfully received. @@ -109,26 +154,6 @@ These resources will be created: * A topic named `myEvents` * A subscription named `myEvents.consumerGroup1` -==== Header Mapping -You can filter incoming and outgoing message headers with `allowHeaders` property. -For example, for a consumer to allow only two headers, provide a comma separated list like this: - -.application.properties ----- -spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.allowedHeaders=allowed1, allowed2 ----- -Where `CONSUMER_NAME` should be replaced by the method which is consuming/reading messages from Cloud Pub/Sub and allowed1, allowed2 is the comma separated list of headers that the user wants to keep. - - - -A similar style is applicable for producers as well. For example: - -.application.properties ----- -spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.allowedHeaders=allowed3,allowed4 ----- -Where `PRODUCER_NAME` should be replaced by the method which is producing/sending messages to Cloud Pub/Sub and allowed3, allowed4 is the comma separated list of headers that user wants to map. All other headers will be removed before the message is sent to Cloud Pub/Sub. - ==== Endpoint Customization diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java index 2894ebc06c..96c57104a1 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java @@ -17,6 +17,7 @@ package com.google.cloud.spring.stream.binder.pubsub.properties; import com.google.cloud.spring.pubsub.integration.AckMode; +import java.time.Duration; /** Consumer properties for Pub/Sub. */ public class PubSubConsumerProperties extends PubSubCommonProperties { @@ -29,6 +30,15 @@ public class PubSubConsumerProperties extends PubSubCommonProperties { private DeadLetterPolicy deadLetterPolicy = null; + /** + * Policy for how soon the subscription should be deleted after no activity. + * + *
Note, a null or unset {@code expirationPolicy} will use the Google-provided default of 31 + * days TTL. To set no expiration, provide an {@code expirationPolicy} with a zero-duration (e.g. + * 0d) {@link ExpirationPolicy#ttl}. + */ + private ExpirationPolicy expirationPolicy = null; + public AckMode getAckMode() { return ackMode; } @@ -61,6 +71,14 @@ public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) { this.deadLetterPolicy = deadLetterPolicy; } + public ExpirationPolicy getExpirationPolicy() { + return expirationPolicy; + } + + public void setExpirationPolicy(ExpirationPolicy expirationPolicy) { + this.expirationPolicy = expirationPolicy; + } + public static class DeadLetterPolicy { private String deadLetterTopic; @@ -82,4 +100,25 @@ public void setMaxDeliveryAttempts(Integer maxDeliveryAttempts) { this.maxDeliveryAttempts = maxDeliveryAttempts; } } + + public static class ExpirationPolicy { + /** + * How long the subscription can have no activity before it is automatically deleted. + * + *
Provide an Expiration Policy with a zero (e.g. 0d) {@code ttl} to never expire.
+ */
+ private Duration ttl;
+
+ public Duration getTtl() {
+ if (ttl != null && (ttl.isZero() || ttl.isNegative())) {
+ // non-positive is treated as "never expire"
+ return null;
+ }
+ return ttl;
+ }
+
+ public void setTtl(Duration ttl) {
+ this.ttl = ttl;
+ }
+ }
}
diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
index ca3ba52679..0b84db43dd 100644
--- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
+++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
@@ -21,6 +21,7 @@
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubProducerProperties;
import com.google.pubsub.v1.DeadLetterPolicy;
+import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
@@ -71,8 +72,6 @@ public ConsumerDestination provisionConsumerDestination(
String customName = properties.getExtension().getSubscriptionName();
boolean autoCreate = properties.getExtension().isAutoCreateResources();
- PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy =
- properties.getExtension().getDeadLetterPolicy();
// topicName may be either the short or fully-qualified version.
String topicShortName =
@@ -101,7 +100,7 @@ public ConsumerDestination provisionConsumerDestination(
subscriptionName = "anonymous." + topicShortName + "." + UUID.randomUUID();
this.anonymousGroupSubscriptionNames.add(subscriptionName);
}
- ensureSubscriptionExists(subscriptionName, topicName, deadLetterPolicy, autoCreate);
+ ensureSubscriptionExists(subscriptionName, topicName, properties.getExtension());
}
Assert.hasText(subscriptionName, "Subscription Name cannot be null or empty");
@@ -142,11 +141,10 @@ Topic ensureTopicExists(String topicName, boolean autoCreate) {
Subscription ensureSubscriptionExists(
String subscriptionName,
String topicName,
- PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy,
- boolean autoCreate) {
+ PubSubConsumerProperties properties) {
Subscription subscription = this.pubSubAdmin.getSubscription(subscriptionName);
if (subscription == null) {
- return createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate);
+ return createSubscription(subscriptionName, topicName, properties);
}
return subscription;
}
@@ -154,16 +152,16 @@ Subscription ensureSubscriptionExists(
private Subscription createSubscription(
String subscriptionName,
String topicName,
- PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy,
- boolean autoCreate) {
+ PubSubConsumerProperties properties) {
Subscription.Builder builder =
Subscription.newBuilder().setName(subscriptionName).setTopic(topicName);
+ PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getDeadLetterPolicy();
if (deadLetterPolicy != null) {
String dlTopicName = deadLetterPolicy.getDeadLetterTopic();
Assert.hasText(dlTopicName, "Dead letter policy cannot have null or empty topic");
- Topic dlTopic = ensureTopicExists(dlTopicName, autoCreate);
+ Topic dlTopic = ensureTopicExists(dlTopicName, properties.isAutoCreateResources());
DeadLetterPolicy.Builder dlpBuilder =
DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName());
@@ -175,6 +173,19 @@ private Subscription createSubscription(
builder.setDeadLetterPolicy(dlpBuilder);
}
+ PubSubConsumerProperties.ExpirationPolicy expirationPolicy = properties.getExpirationPolicy();
+ if (expirationPolicy != null) {
+ ExpirationPolicy.Builder epBuilder = ExpirationPolicy.newBuilder();
+
+ if (expirationPolicy.getTtl() != null) {
+ long desiredSeconds = expirationPolicy.getTtl().getSeconds();
+ epBuilder.setTtl(
+ com.google.protobuf.Duration.newBuilder().setSeconds(desiredSeconds).build());
+ }
+
+ builder.setExpirationPolicy(epBuilder);
+ }
+
return this.pubSubAdmin.createSubscription(builder);
}
}
diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java
index 64212bcb2a..a134afeeaf 100644
--- a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java
+++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java
@@ -35,6 +35,7 @@
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
+import org.assertj.core.data.Offset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -200,6 +201,76 @@ void testProvisionConsumerDestination_deadLetterQueue() {
assertThat(policy.getMaxDeliveryAttempts()).isEqualTo(12);
}
+ @Test
+ void testProvisionConsumerDestination_expirationPolicyNoneSet() {
+ when(this.pubSubConsumerProperties.getExpirationPolicy()).thenReturn(null);
+
+ when(this.pubSubAdminMock.getTopic("topic_A")).thenReturn(null);
+ when(this.pubSubAdminMock.createTopic("topic_A"))
+ .thenReturn(Topic.newBuilder().setName("projects/test-project/topics/topic_A").build());
+
+ this.pubSubChannelProvisioner.provisionConsumerDestination(
+ "topic_A", "group_A", this.extendedConsumerProperties);
+
+ ArgumentCaptor