Skip to content

Commit

Permalink
checkpoint: subscription.v1
Browse files Browse the repository at this point in the history
  • Loading branch information
cpate4 committed Aug 16, 2024
1 parent 3bd65cb commit 588784f
Show file tree
Hide file tree
Showing 18 changed files with 263 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
import bio.terra.common.events.client.PubsubClientFactory;
import bio.terra.common.events.config.PubsubConfig;
import bio.terra.common.events.topics.ChartTopic;
import bio.terra.common.events.topics.messages.EventMessage;
import org.springframework.stereotype.Repository;

@Repository
public class ChartEvents extends ChartTopic {
public ChartEvents(PubsubConfig config, PubsubClientFactory factory) {
super(config, factory);
// TODO: default to this::receive if you don't provide one
subscribe(this::receive);
}

@Override
public boolean process(EventMessage message) {
System.out.println("Received message: " + message.eventType);
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bio.terra.common.events.client;

@FunctionalInterface
public interface MessageProcessor {
boolean process(String jsonString);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
import java.io.Closeable;

/**
* The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure.
* The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure
* for a single topic.
*
* <p>To create an instance of this class, please see {@link PubsubClientFactory}
*
* <p>The PubsubClient is responsible for ensuring the following conditions:
*
* <ul>
* <li>the topic exists
* </ul>
*/
public abstract class PubsubClient implements Closeable {

public abstract void publish(byte[] message);
public abstract void publish(String message);

public abstract void subscribe();
public abstract void subscribe(MessageProcessor process);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import bio.terra.common.events.client.google.GooglePubsubClient;
import bio.terra.common.events.config.PubsubConfig;
import bio.terra.common.events.topics.EventSubscriber;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -29,20 +30,32 @@ public PubsubClientFactory(PubsubConfig config) {
this.pubsubConfig = config;
}

public PubsubClient createPubsubClient(String topicName) {
public PubsubClient createPubsubClient(
String topicName, String serviceName, EventSubscriber subscriber) {
return new GooglePubsubClient(
pubsubConfig.googleConfig().projectId(),
formatTopicName(topicName),
formatTopicId(topicName),
formatSubscriptionId(serviceName, topicName),
pubsubConfig.createTopic());
}

private String formatTopicName(String topic) {
List<String> parts = new ArrayList<>(Arrays.asList("event", topic));
private String formatTopicId(String topicName) {
List<String> parts = new ArrayList<>(Arrays.asList("event", topicName));

if (pubsubConfig.nameSuffix() != null) {
parts.add(pubsubConfig.nameSuffix());
}

return String.join("-", parts);
}

private String formatSubscriptionId(String serviceName, String topicName) {
if (serviceName == null) {
return null;
}

List<String> parts = new ArrayList<>(Arrays.asList("subscription", serviceName, topicName));

return String.join("-", parts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public EventTopicMustBeAlreadyCreated(String projectId) {

/**
* This is called when running in the Production environment Verify the topic exists or generate a
* ConfigurationError # Then return the TopicName
* ConfigurationError then return the TopicName
*
* @param name
* @return TopicName for the Event topic for Production
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
package bio.terra.common.events.client.google;

import bio.terra.common.events.client.MessageProcessor;
import bio.terra.common.events.client.PubsubClient;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.naming.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,39 +30,98 @@ public class GooglePubsubClient extends PubsubClient {
private static final Logger logger = LoggerFactory.getLogger(GooglePubsubClient.class);

private String projectId;
private String topicName;
private String topicId;
private String subscriptionId;
private Publisher publisher;
private Subscriber subscriber;

public GooglePubsubClient(String projectId, String topicName, boolean createTopic) {
public GooglePubsubClient(
String projectId, String topicId, String subscriptionId, boolean createTopic) {
this.projectId = projectId;
this.topicName = topicName;
publisher = buildPublisher(projectId, topicName, createTopic);
this.topicId = topicId;
this.subscriptionId = subscriptionId;
publisher = buildPublisher(projectId, topicId, createTopic);
}

@Override
public void publish(byte[] message) {
public void publish(String message) {
if (logger.isDebugEnabled()) {
logger.debug(new String(message, StandardCharsets.UTF_8));
logger.debug(message);
}
// TODO: remove me
logger.info(new String(message, StandardCharsets.UTF_8));
logger.info(message);
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

ApiFuture<String> future = publisher.publish(pubsubMessage);
ApiFutures.addCallback(future, makePublishCallback(message), MoreExecutors.directExecutor());
}

@Override
public void subscribe() {}
public void subscribe(MessageProcessor processor) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);

// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
String eventMsg = message.getData().toStringUtf8();
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
if (processor.process(eventMsg)) {
consumer.ack();
} else {
consumer.nack();
}
};

subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
closePublisher();
closeSubscriber();
}

private Publisher buildPublisher(String projectId, String topicName, boolean createTopic) {
try {
logger.info("Building events publisher: " + projectId + ":" + topicName);
TopicName topic = verifyTopic(projectId, topicName, createTopic);
return Publisher.newBuilder(topic).build();
} catch (IOException | ConfigurationException e) {
throw new RuntimeException(e);
}
}

private void closePublisher() {
if (publisher != null) {
logger.info("Stopping events publisher: " + projectId + ":" + topicId);
publisher.shutdown();
try {
publisher.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private void closeSubscriber() {
if (subscriber != null) {
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.startAsync();
try {
subscriber.awaitTerminated(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}

private TopicName verifyTopic(String projectId, String topicName, boolean createTopic)
throws IOException, ConfigurationException {
EventTopicName topicCreator = null;
Expand All @@ -61,4 +132,19 @@ private TopicName verifyTopic(String projectId, String topicName, boolean create
}
return topicCreator.verifyTopicName(topicName);
}

private ApiFutureCallback<String> makePublishCallback(String message) {
return new ApiFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("Error publishing message : " + message, throwable);
}

@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
logger.info("Published message ID: " + messageId);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,14 +55,5 @@ public void onSuccess(String messageId) {
}

@Override
public void close() {
if (publisher != null) {
publisher.shutdown();
try {
publisher.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public GoogleConfig googleConfig() {
}

public String nameSuffix() {
if (beeConfig != null) {
if (beeConfig != null && beeConfig.isActive()) {
return beeConfig.name();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
import bio.terra.common.events.topics.messages.charts.ChartMessage;
import bio.terra.common.events.topics.messages.charts.ChartUpdated;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Repository;
import org.springframework.web.util.UriComponentsBuilder;

@Repository
public class ChartTopic extends EventTopic<ChartMessage> {
public abstract class ChartTopic extends EventTopic<ChartMessage> {

private String publishedBy;
protected String publishedBy;

public ChartTopic(PubsubConfig config, PubsubClientFactory clientFactory) {
super(clientFactory.createPubsubClient("charts"));
super(clientFactory, "charts", config.publishedBy());
publishedBy = config.publishedBy();
}

Expand All @@ -28,11 +26,6 @@ private static String buildEntityUrl(String entityId) {
.toUriString();
}

@Override
protected Boolean process(ChartMessage message) {
return Boolean.TRUE;
}

public void chartCreated(String entityId) {
publish(new ChartCreated(publishedBy, entityId, buildEntityUrl(entityId)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package bio.terra.common.events.topics;

import bio.terra.common.events.topics.messages.EventMessage;

@FunctionalInterface
public interface EventSubscriber<T extends EventMessage> {
boolean processEvent(String jsonString);
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,43 @@
package bio.terra.common.events.topics;

import bio.terra.common.events.client.MessageProcessor;
import bio.terra.common.events.client.PubsubClient;
import bio.terra.common.events.client.PubsubClientFactory;
import bio.terra.common.events.topics.messages.EventMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.nio.charset.StandardCharsets;

/** This class is responsible interacting with the PubsubClient (both publish and subscribe). */
public abstract class EventTopic<T extends EventMessage> {

private PubsubClient client;

public EventTopic(PubsubClient client) {
this.client = client;
public EventTopic(PubsubClientFactory clientFactory, String topicName, String serviceName) {
client = clientFactory.createPubsubClient(topicName, serviceName, this::receive);
}

public void publish(T message) {
try {
client.publish(message.toJson().getBytes(StandardCharsets.UTF_8));
client.publish(message.toJson());
} catch (JsonProcessingException e) {
System.out.println("ERROR: unable to publish message");
}
}

public void subscribe() {
T message = null;
process(message);
public void subscribe(MessageProcessor processor) {
client.subscribe(processor);
}

protected boolean receive(String message) {
try {
T msg = (T) EventMessage.fromJson(message);
return process(msg);
} catch (JsonProcessingException e) {
System.out.println(e.getMessage());
// TODO: what to do with bad messages
}

// TODO: this needs to be changed back to false
return true;
}

/**
Expand All @@ -34,5 +47,5 @@ public void subscribe() {
* @param message
* @return
*/
protected abstract Boolean process(T message);
public abstract boolean process(EventMessage message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package bio.terra.common.events.topics.messages;

public class EventFactory {}
Loading

0 comments on commit 588784f

Please sign in to comment.