Skip to content

Commit

Permalink
chore: prepare pubsub integration json sample tests for parallel runs (
Browse files Browse the repository at this point in the history
…#2118)

* chore: prepare pubsub integration json sample tests for parallel runs
  • Loading branch information
mpeddada1 authored Aug 11, 2023
1 parent 109f4ba commit 3506288
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
Expand All @@ -39,20 +40,25 @@ public class ReceiverConfiguration {

private static final Log LOGGER = LogFactory.getLog(ReceiverConfiguration.class);

private static final String SUBSCRIPTION_NAME = "json-payload-sample-subscription";

private final ArrayList<Person> processedPersonsList = new ArrayList<>();

@Bean
public String subscriptionName(@Value("${subscriptionName}") String subscriptionName) {
return subscriptionName;
}

@Bean
public DirectChannel pubSubInputChannel() {
return new DirectChannel();
}

@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubSubInputChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) {
@Qualifier("pubSubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate,
@Qualifier("subscriptionName") String subscriptionName) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, SUBSCRIPTION_NAME);
new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(Person.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
Expand All @@ -33,7 +35,10 @@ public class SenderConfiguration {

private static final Log LOGGER = LogFactory.getLog(SenderConfiguration.class);

private static final String TOPIC_NAME = "json-payload-sample-topic";
@Bean
public String topicName(@Value("${topicName}") String topicName) {
return topicName;
}

@Bean
public DirectChannel pubSubOutputChannel() {
Expand All @@ -42,10 +47,12 @@ public DirectChannel pubSubOutputChannel() {

@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler adapter = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
public MessageHandler messageSender(
PubSubTemplate pubSubTemplate, @Qualifier("topicName") String topicName) {
PubSubMessageHandler adapter = new PubSubMessageHandler(pubSubTemplate, topicName);
adapter.setSuccessCallback((ackId, message) -> LOGGER.info("Message was sent successfully."));
adapter.setFailureCallback((cause, message) -> LOGGER.info("There was an error sending the message."));
adapter.setFailureCallback(
(cause, message) -> LOGGER.info("There was an error sending the message."));
return adapter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
subscriptionName=json-payload-sample-subscription
topicName=json-payload-sample-topic
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.spring.core.util.MapBuilder;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -34,6 +44,7 @@
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

/** Tests the Pub/Sub Json payload app. */
Expand All @@ -42,9 +53,19 @@
@ExtendWith(SpringExtension.class)
@SpringBootTest(
webEnvironment = WebEnvironment.RANDOM_PORT,
classes = {PubSubJsonPayloadApplication.class})
properties = {"spring.main.allow-bean-definition-overriding=true"},
classes = {PubSubJsonPayloadTestConfiguration.class, PubSubJsonPayloadApplication.class})
@TestPropertySource(locations = "classpath:application-test.properties")
class PubSubJsonPayloadSampleApplicationIntegrationTests {

private static final String PROJECT_NAME =
ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject();
@Autowired private Subscription testSubscription;
@Autowired private PubSubTemplate pubSubTemplate;
@Autowired private Topic testTopic;
@Autowired private TopicAdminClient topicAdminClient;
@Autowired private SubscriptionAdminClient subscriptionAdminClient;

@Autowired private TestRestTemplate testRestTemplate;

@Test
Expand Down Expand Up @@ -72,4 +93,34 @@ void testReceivesJsonPayload() {
assertThat(response.getBody()).containsOnlyOnce(new Person("Bob", age));
});
}

@AfterEach
void cleanUp() {
List<String> projectTopics = fetchTopicNamesFromProject();
String topicName = testTopic.getName();
if (projectTopics.contains(topicName)) {
this.topicAdminClient.deleteTopic(topicName);
}
List<String> projectSubscriptions = fetchSubscriptionNamesFromProject();
String subscriptionName = testSubscription.getName();
if (projectSubscriptions.contains(subscriptionName)) {
this.subscriptionAdminClient.deleteSubscription(subscriptionName);
}
}

private List<String> fetchTopicNamesFromProject() {
TopicAdminClient.ListTopicsPagedResponse listTopicsResponse =
topicAdminClient.listTopics("projects/" + PROJECT_NAME);
return StreamSupport.stream(listTopicsResponse.iterateAll().spliterator(), false)
.map(Topic::getName)
.collect(Collectors.toList());
}

private List<String> fetchSubscriptionNamesFromProject() {
SubscriptionAdminClient.ListSubscriptionsPagedResponse response =
subscriptionAdminClient.listSubscriptions("projects/" + PROJECT_NAME);
return StreamSupport.stream(response.iterateAll().spliterator(), false)
.map(Subscription::getName)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;

public class PubSubJsonPayloadTestConfiguration {

private final TopicAdminClient topicAdminClient;

private final SubscriptionAdminClient subscriptionAdminClient;

public PubSubJsonPayloadTestConfiguration(
TopicAdminClient topicAdminClient, SubscriptionAdminClient subscriptionAdminClient) {
this.topicAdminClient = topicAdminClient;
this.subscriptionAdminClient = subscriptionAdminClient;
}

@Bean
public Topic createTopic(@Qualifier("topicName") String topicName) {
String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject();
return topicAdminClient.createTopic(TopicName.of(projectName, topicName));
}

@Bean
public Subscription createSubscription(
@Qualifier("topicName") String topicName,
@Qualifier("subscriptionName") String subscriptionName) {
String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject();
return subscriptionAdminClient.createSubscription(
SubscriptionName.of(projectName, subscriptionName),
TopicName.of(projectName, topicName),
PushConfig.getDefaultInstance(),
10);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
topicName=pubsub-json-payload-sample-topic-${random.uuid}
subscriptionName=pubsub-json-payload-sample-subscription-${random.uuid}

0 comments on commit 3506288

Please sign in to comment.