Skip to content

Commit

Permalink
feat: configurable expiration policy on auto-created subscriptions (#…
Browse files Browse the repository at this point in the history
…2876) (@jmitash) (#2897)

* feat: configurable expiration policy on auto-created subscriptions (#2894)

* Add properties for Binder expiration policy

* Set Expiration Policy when creating Subscription

* Treat non-positive durations as "never expire"

* Use Google Java formatting

* docs: add expiration policy section to pubsub

* chore: access autocreate value from properties

* chore: fix test

* docs: correct never expire logic

* chore: add example of setting expirationPolicy.ttl to pubsub-stream functional sample

* docs: update stream binder docs with configuration options

* Update docs/src/main/asciidoc/pubsub.adoc

Co-authored-by: Mike Eltsufin <meltsufin@google.com>

* Update docs/src/main/asciidoc/pubsub.adoc

Co-authored-by: Mike Eltsufin <meltsufin@google.com>

* Update docs/src/main/asciidoc/pubsub.adoc

Co-authored-by: Mike Eltsufin <meltsufin@google.com>

* Update docs/src/main/asciidoc/spring-stream.adoc

Co-authored-by: Mike Eltsufin <meltsufin@google.com>

* Update docs/src/main/asciidoc/spring-stream.adoc

Co-authored-by: Mike Eltsufin <meltsufin@google.com>

* Update docs/src/main/asciidoc/spring-stream.adoc

Co-authored-by: Mike Eltsufin <meltsufin@google.com>

---------

Co-authored-by: Jacob Mitash <jacob@mitash.org>
Co-authored-by: Mike Eltsufin <meltsufin@google.com>
  • Loading branch information
3 people authored May 21, 2024
1 parent 0f94e07 commit d8d7168
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 45 deletions.
20 changes: 20 additions & 0 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,26 @@ public Subscription newSubscription() {

Dead letter topics are no different from any other topic, though some https://cloud.google.com/pubsub/docs/dead-letter-topics#granting_forwarding_permissions[additional permissions] are necessary to ensure the Cloud Pub/Sub service can successfully `ack` the original message and re-`publish` on the dead letter topic.

===== Expiration Policy

By default, subscriptions expire after 31 days without any subscriber activity or subscription property changes.
A `null` or unset `ExpirationPolicy` will use the default.

To create a subscription that never expires, provide an `ExpirationPolicy` without setting the TTL.

See
https://cloud.google.com/pubsub/docs/subscription-properties#expiration_period[Expiration Period] for more information.

[source,java,indent=0]
----
public Subscription newSubscription() {
return pubSubAdmin.createSubscription(Subscription.newBuilder()
.setName(SUBSCRIPTION_NAME)
.setTopic(TOPIC_NAME)
.setExpirationPolicy(ExpirationPolicy.newBuilder().build())); // Never expire
}
----

==== JSON support

For serialization and deserialization of POJOs using Jackson JSON, configure a `PubSubMessageConverter` bean, and the Spring Boot starter for Spring Framework on Google Cloud Pub/Sub will automatically wire it into the `PubSubTemplate`.
Expand Down
73 changes: 49 additions & 24 deletions docs/src/main/asciidoc/spring-stream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,46 @@ If you are using Pub/Sub autoconfiguration from the Spring Framework on Google C

NOTE: To use this binder with a https://cloud.google.com/pubsub/docs/emulator[running emulator], configure its host and port via `spring.cloud.gcp.pubsub.emulator-host`.

==== Producer Synchronous Sending Configuration
==== Producer/Consumer Shared Configuration
These properties can be applied to both producers and consumers.
|===
| Name | Description | Required | Default value
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME/PRODUCER_NAME}.[consumer/producer].allowedHeaders` | Filters incoming and outgoing messages to contain only the provided comma-delimited headers | No |
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME/PRODUCER_NAME}.[consumer/producer].auto-create-resources` | When enabled, topic/subscription will be created if they do not exist | No | true
|===

===== Header Mapping
You can filter incoming and outgoing message headers with `allowHeaders` property.
For example, for a consumer to allow only two headers, provide a comma separated list like this:

.application.properties
----
spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.allowedHeaders=allowed1,allowed2
----
Where `CONSUMER_NAME` should be replaced by the method which is consuming/reading messages from Cloud Pub/Sub and `allowed1,allowed2` is the comma separated list of headers that the user wants to keep.



A similar style is applicable for producers as well. For example:

.application.properties
----
spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.allowedHeaders=allowed3,allowed4
----
Where `PRODUCER_NAME` should be replaced by the method which is producing/sending messages to Cloud Pub/Sub and `allowed3,allowed4` is the comma separated list of headers that user wants to map. All other headers will be removed before the message is sent to Cloud Pub/Sub.


==== Producer-specific Configuration
|===
| Name | Description | Required | Default value
| `spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.sync` | Enables synchronous sending | No | false
|===

===== Producer Synchronous Sending Configuration
By default, this binder will send messages to Cloud Pub/Sub asynchronously.
If synchronous sending is preferred (for example, to allow propagating errors back to the sender), set `spring.cloud.stream.gcp.pubsub.default.producer.sync` property to `true`.

==== Producer Destination Configuration

===== Producer Destination Configuration
If automatic resource creation is turned ON and the topic corresponding to the destination name does not exist, it will be created.

For example, for the following configuration, a topic called `myEvents` would be created.
Expand All @@ -60,8 +94,19 @@ spring.cloud.stream.bindings.{PRODUCER_NAME}.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.auto-create-resources=true
----

==== Consumer Destination Configuration

==== Consumer-specific Configuration
|===
| Name | Description | Required | Default value
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.ack-mode` | Controls how messages will be acknowledged when they are successfully received. Options: AUTO, AUTO_ACK, and MANUAL | No | AUTO
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.dead-letter-policy.dead-letter-topic` | Duration of no activity after which a subscription will expire. Use 0d to never expire. | No | 31d
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.dead-letter-policy.max-delivery-attempts` | Duration of no activity after which a subscription will expire. Use 0d to never expire. | No | 31d
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.expiration-policy.ttl` | Duration of no activity after which a subscription will expire. Use 0d to never expire. | No | 31d
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.max-fetch-size` | Limits the number of messages received per poll | No |
| `spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.subscription-name` | When provided, uses the given subscription name | No |
|===

===== Consumer Destination Configuration
A `PubSubInboundChannelAdapter` will be configured for your consumer endpoint.
You may adjust the ack mode of the consumer endpoint using the `ack-mode` property.
The ack mode controls how messages will be acknowledged when they are successfully received.
Expand Down Expand Up @@ -109,26 +154,6 @@ These resources will be created:
* A topic named `myEvents`
* A subscription named `myEvents.consumerGroup1`

==== Header Mapping
You can filter incoming and outgoing message headers with `allowHeaders` property.
For example, for a consumer to allow only two headers, provide a comma separated list like this:

.application.properties
----
spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.allowedHeaders=allowed1, allowed2
----
Where `CONSUMER_NAME` should be replaced by the method which is consuming/reading messages from Cloud Pub/Sub and allowed1, allowed2 is the comma separated list of headers that the user wants to keep.



A similar style is applicable for producers as well. For example:

.application.properties
----
spring.cloud.stream.gcp.pubsub.bindings.{PRODUCER_NAME}.producer.allowedHeaders=allowed3,allowed4
----
Where `PRODUCER_NAME` should be replaced by the method which is producing/sending messages to Cloud Pub/Sub and allowed3, allowed4 is the comma separated list of headers that user wants to map. All other headers will be removed before the message is sent to Cloud Pub/Sub.



==== Endpoint Customization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spring.stream.binder.pubsub.properties;

import com.google.cloud.spring.pubsub.integration.AckMode;
import java.time.Duration;

/** Consumer properties for Pub/Sub. */
public class PubSubConsumerProperties extends PubSubCommonProperties {
Expand All @@ -29,6 +30,15 @@ public class PubSubConsumerProperties extends PubSubCommonProperties {

private DeadLetterPolicy deadLetterPolicy = null;

/**
* Policy for how soon the subscription should be deleted after no activity.
*
* <p>Note, a null or unset {@code expirationPolicy} will use the Google-provided default of 31
* days TTL. To set no expiration, provide an {@code expirationPolicy} with a zero-duration (e.g.
* 0d) {@link ExpirationPolicy#ttl}.
*/
private ExpirationPolicy expirationPolicy = null;

public AckMode getAckMode() {
return ackMode;
}
Expand Down Expand Up @@ -61,6 +71,14 @@ public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

public ExpirationPolicy getExpirationPolicy() {
return expirationPolicy;
}

public void setExpirationPolicy(ExpirationPolicy expirationPolicy) {
this.expirationPolicy = expirationPolicy;
}

public static class DeadLetterPolicy {
private String deadLetterTopic;

Expand All @@ -82,4 +100,25 @@ public void setMaxDeliveryAttempts(Integer maxDeliveryAttempts) {
this.maxDeliveryAttempts = maxDeliveryAttempts;
}
}

public static class ExpirationPolicy {
/**
* How long the subscription can have no activity before it is automatically deleted.
*
* <p>Provide an Expiration Policy with a zero (e.g. 0d) {@code ttl} to never expire.
*/
private Duration ttl;

public Duration getTtl() {
if (ttl != null && (ttl.isZero() || ttl.isNegative())) {
// non-positive is treated as "never expire"
return null;
}
return ttl;
}

public void setTtl(Duration ttl) {
this.ttl = ttl;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubProducerProperties;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
Expand Down Expand Up @@ -71,8 +72,6 @@ public ConsumerDestination provisionConsumerDestination(
String customName = properties.getExtension().getSubscriptionName();

boolean autoCreate = properties.getExtension().isAutoCreateResources();
PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy =
properties.getExtension().getDeadLetterPolicy();

// topicName may be either the short or fully-qualified version.
String topicShortName =
Expand Down Expand Up @@ -101,7 +100,7 @@ public ConsumerDestination provisionConsumerDestination(
subscriptionName = "anonymous." + topicShortName + "." + UUID.randomUUID();
this.anonymousGroupSubscriptionNames.add(subscriptionName);
}
ensureSubscriptionExists(subscriptionName, topicName, deadLetterPolicy, autoCreate);
ensureSubscriptionExists(subscriptionName, topicName, properties.getExtension());
}

Assert.hasText(subscriptionName, "Subscription Name cannot be null or empty");
Expand Down Expand Up @@ -142,28 +141,27 @@ Topic ensureTopicExists(String topicName, boolean autoCreate) {
Subscription ensureSubscriptionExists(
String subscriptionName,
String topicName,
PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy,
boolean autoCreate) {
PubSubConsumerProperties properties) {
Subscription subscription = this.pubSubAdmin.getSubscription(subscriptionName);
if (subscription == null) {
return createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate);
return createSubscription(subscriptionName, topicName, properties);
}
return subscription;
}

private Subscription createSubscription(
String subscriptionName,
String topicName,
PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy,
boolean autoCreate) {
PubSubConsumerProperties properties) {
Subscription.Builder builder =
Subscription.newBuilder().setName(subscriptionName).setTopic(topicName);

PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getDeadLetterPolicy();
if (deadLetterPolicy != null) {
String dlTopicName = deadLetterPolicy.getDeadLetterTopic();
Assert.hasText(dlTopicName, "Dead letter policy cannot have null or empty topic");

Topic dlTopic = ensureTopicExists(dlTopicName, autoCreate);
Topic dlTopic = ensureTopicExists(dlTopicName, properties.isAutoCreateResources());

DeadLetterPolicy.Builder dlpBuilder =
DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName());
Expand All @@ -175,6 +173,19 @@ private Subscription createSubscription(
builder.setDeadLetterPolicy(dlpBuilder);
}

PubSubConsumerProperties.ExpirationPolicy expirationPolicy = properties.getExpirationPolicy();
if (expirationPolicy != null) {
ExpirationPolicy.Builder epBuilder = ExpirationPolicy.newBuilder();

if (expirationPolicy.getTtl() != null) {
long desiredSeconds = expirationPolicy.getTtl().getSeconds();
epBuilder.setTtl(
com.google.protobuf.Duration.newBuilder().setSeconds(desiredSeconds).build());
}

builder.setExpirationPolicy(epBuilder);
}

return this.pubSubAdmin.createSubscription(builder);
}
}
Loading

0 comments on commit d8d7168

Please sign in to comment.