From 588784f7274d2e592120546384c2827c26f0b84a Mon Sep 17 00:00:00 2001 From: Chip Pate Date: Fri, 16 Aug 2024 14:16:39 -0400 Subject: [PATCH] checkpoint: subscription.v1 --- .../terra/appmanager/events/ChartEvents.java | 9 ++ .../events/client/MessageProcessor.java | 6 + .../common/events/client/PubsubClient.java | 13 ++- .../events/client/PubsubClientFactory.java | 21 +++- .../EventTopicMustBeAlreadyCreated.java | 2 +- .../client/google/GooglePubsubClient.java | 106 ++++++++++++++++-- .../events/client/google/PublisherDao.java | 12 +- .../common/events/config/PubsubConfig.java | 2 +- .../common/events/topics/ChartTopic.java | 13 +-- .../common/events/topics/EventSubscriber.java | 8 ++ .../common/events/topics/EventTopic.java | 29 +++-- .../events/topics/messages/EventFactory.java | 3 + .../events/topics/messages/EventMessage.java | 44 +++++--- .../topics/messages/charts/ChartCreated.java | 6 +- .../topics/messages/charts/ChartDeleted.java | 6 +- .../topics/messages/charts/ChartMessage.java | 34 +++++- .../topics/messages/charts/ChartUpdated.java | 8 +- .../topics/messages/EventMessageTest.java | 12 +- 18 files changed, 263 insertions(+), 71 deletions(-) create mode 100644 service/src/main/java/bio/terra/common/events/client/MessageProcessor.java create mode 100644 service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java create mode 100644 service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java diff --git a/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java b/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java index 293dbc8..c366444 100644 --- a/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java +++ b/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java @@ -3,11 +3,20 @@ import bio.terra.common.events.client.PubsubClientFactory; import bio.terra.common.events.config.PubsubConfig; import bio.terra.common.events.topics.ChartTopic; +import bio.terra.common.events.topics.messages.EventMessage; import org.springframework.stereotype.Repository; @Repository public class ChartEvents extends ChartTopic { public ChartEvents(PubsubConfig config, PubsubClientFactory factory) { super(config, factory); + // TODO: default to this::receive if you don't provide one + subscribe(this::receive); + } + + @Override + public boolean process(EventMessage message) { + System.out.println("Received message: " + message.eventType); + return true; } } diff --git a/service/src/main/java/bio/terra/common/events/client/MessageProcessor.java b/service/src/main/java/bio/terra/common/events/client/MessageProcessor.java new file mode 100644 index 0000000..dc4f8fc --- /dev/null +++ b/service/src/main/java/bio/terra/common/events/client/MessageProcessor.java @@ -0,0 +1,6 @@ +package bio.terra.common.events.client; + +@FunctionalInterface +public interface MessageProcessor { + boolean process(String jsonString); +} diff --git a/service/src/main/java/bio/terra/common/events/client/PubsubClient.java b/service/src/main/java/bio/terra/common/events/client/PubsubClient.java index aaa1450..f0f86e3 100644 --- a/service/src/main/java/bio/terra/common/events/client/PubsubClient.java +++ b/service/src/main/java/bio/terra/common/events/client/PubsubClient.java @@ -3,13 +3,20 @@ import java.io.Closeable; /** - * The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure. + * The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure + * for a single topic. * *

To create an instance of this class, please see {@link PubsubClientFactory} + * + *

The PubsubClient is responsible for ensuring the following conditions: + * + *

*/ public abstract class PubsubClient implements Closeable { - public abstract void publish(byte[] message); + public abstract void publish(String message); - public abstract void subscribe(); + public abstract void subscribe(MessageProcessor process); } diff --git a/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java b/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java index 2d37053..213e70b 100644 --- a/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java +++ b/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java @@ -2,6 +2,7 @@ import bio.terra.common.events.client.google.GooglePubsubClient; import bio.terra.common.events.config.PubsubConfig; +import bio.terra.common.events.topics.EventSubscriber; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -29,15 +30,17 @@ public PubsubClientFactory(PubsubConfig config) { this.pubsubConfig = config; } - public PubsubClient createPubsubClient(String topicName) { + public PubsubClient createPubsubClient( + String topicName, String serviceName, EventSubscriber subscriber) { return new GooglePubsubClient( pubsubConfig.googleConfig().projectId(), - formatTopicName(topicName), + formatTopicId(topicName), + formatSubscriptionId(serviceName, topicName), pubsubConfig.createTopic()); } - private String formatTopicName(String topic) { - List parts = new ArrayList<>(Arrays.asList("event", topic)); + private String formatTopicId(String topicName) { + List parts = new ArrayList<>(Arrays.asList("event", topicName)); if (pubsubConfig.nameSuffix() != null) { parts.add(pubsubConfig.nameSuffix()); @@ -45,4 +48,14 @@ private String formatTopicName(String topic) { return String.join("-", parts); } + + private String formatSubscriptionId(String serviceName, String topicName) { + if (serviceName == null) { + return null; + } + + List parts = new ArrayList<>(Arrays.asList("subscription", serviceName, topicName)); + + return String.join("-", parts); + } } diff --git a/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java b/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java index 4220e17..a0408e2 100644 --- a/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java +++ b/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java @@ -20,7 +20,7 @@ public EventTopicMustBeAlreadyCreated(String projectId) { /** * This is called when running in the Production environment Verify the topic exists or generate a - * ConfigurationError # Then return the TopicName + * ConfigurationError then return the TopicName * * @param name * @return TopicName for the Event topic for Production diff --git a/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java b/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java index c6e039e..bc4951d 100644 --- a/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java +++ b/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java @@ -1,10 +1,22 @@ package bio.terra.common.events.client.google; +import bio.terra.common.events.client.MessageProcessor; import bio.terra.common.events.client.PubsubClient; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.naming.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,32 +30,67 @@ public class GooglePubsubClient extends PubsubClient { private static final Logger logger = LoggerFactory.getLogger(GooglePubsubClient.class); private String projectId; - private String topicName; + private String topicId; + private String subscriptionId; private Publisher publisher; + private Subscriber subscriber; - public GooglePubsubClient(String projectId, String topicName, boolean createTopic) { + public GooglePubsubClient( + String projectId, String topicId, String subscriptionId, boolean createTopic) { this.projectId = projectId; - this.topicName = topicName; - publisher = buildPublisher(projectId, topicName, createTopic); + this.topicId = topicId; + this.subscriptionId = subscriptionId; + publisher = buildPublisher(projectId, topicId, createTopic); } @Override - public void publish(byte[] message) { + public void publish(String message) { if (logger.isDebugEnabled()) { - logger.debug(new String(message, StandardCharsets.UTF_8)); + logger.debug(message); } // TODO: remove me - logger.info(new String(message, StandardCharsets.UTF_8)); + logger.info(message); + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + + ApiFuture future = publisher.publish(pubsubMessage); + ApiFutures.addCallback(future, makePublishCallback(message), MoreExecutors.directExecutor()); } @Override - public void subscribe() {} + public void subscribe(MessageProcessor processor) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + String eventMsg = message.getData().toStringUtf8(); + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + if (processor.process(eventMsg)) { + consumer.ack(); + } else { + consumer.nack(); + } + }; + + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + } @Override - public void close() throws IOException {} + public void close() throws IOException { + closePublisher(); + closeSubscriber(); + } private Publisher buildPublisher(String projectId, String topicName, boolean createTopic) { try { + logger.info("Building events publisher: " + projectId + ":" + topicName); TopicName topic = verifyTopic(projectId, topicName, createTopic); return Publisher.newBuilder(topic).build(); } catch (IOException | ConfigurationException e) { @@ -51,6 +98,30 @@ private Publisher buildPublisher(String projectId, String topicName, boolean cre } } + private void closePublisher() { + if (publisher != null) { + logger.info("Stopping events publisher: " + projectId + ":" + topicId); + publisher.shutdown(); + try { + publisher.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private void closeSubscriber() { + if (subscriber != null) { + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.startAsync(); + try { + subscriber.awaitTerminated(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + } + private TopicName verifyTopic(String projectId, String topicName, boolean createTopic) throws IOException, ConfigurationException { EventTopicName topicCreator = null; @@ -61,4 +132,19 @@ private TopicName verifyTopic(String projectId, String topicName, boolean create } return topicCreator.verifyTopicName(topicName); } + + private ApiFutureCallback makePublishCallback(String message) { + return new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable throwable) { + logger.error("Error publishing message : " + message, throwable); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + logger.info("Published message ID: " + messageId); + } + }; + } } diff --git a/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java b/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java index eef5df0..c9f5f48 100644 --- a/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java +++ b/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java @@ -8,7 +8,6 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import java.io.Closeable; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +55,5 @@ public void onSuccess(String messageId) { } @Override - public void close() { - if (publisher != null) { - publisher.shutdown(); - try { - publisher.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } + public void close() {} } diff --git a/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java b/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java index d9eb4a6..75c1257 100644 --- a/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java +++ b/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java @@ -37,7 +37,7 @@ public GoogleConfig googleConfig() { } public String nameSuffix() { - if (beeConfig != null) { + if (beeConfig != null && beeConfig.isActive()) { return beeConfig.name(); } return null; diff --git a/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java b/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java index 9ae794f..e6f47e8 100644 --- a/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java +++ b/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java @@ -7,16 +7,14 @@ import bio.terra.common.events.topics.messages.charts.ChartMessage; import bio.terra.common.events.topics.messages.charts.ChartUpdated; import org.jetbrains.annotations.NotNull; -import org.springframework.stereotype.Repository; import org.springframework.web.util.UriComponentsBuilder; -@Repository -public class ChartTopic extends EventTopic { +public abstract class ChartTopic extends EventTopic { - private String publishedBy; + protected String publishedBy; public ChartTopic(PubsubConfig config, PubsubClientFactory clientFactory) { - super(clientFactory.createPubsubClient("charts")); + super(clientFactory, "charts", config.publishedBy()); publishedBy = config.publishedBy(); } @@ -28,11 +26,6 @@ private static String buildEntityUrl(String entityId) { .toUriString(); } - @Override - protected Boolean process(ChartMessage message) { - return Boolean.TRUE; - } - public void chartCreated(String entityId) { publish(new ChartCreated(publishedBy, entityId, buildEntityUrl(entityId))); } diff --git a/service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java b/service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java new file mode 100644 index 0000000..d32f809 --- /dev/null +++ b/service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java @@ -0,0 +1,8 @@ +package bio.terra.common.events.topics; + +import bio.terra.common.events.topics.messages.EventMessage; + +@FunctionalInterface +public interface EventSubscriber { + boolean processEvent(String jsonString); +} diff --git a/service/src/main/java/bio/terra/common/events/topics/EventTopic.java b/service/src/main/java/bio/terra/common/events/topics/EventTopic.java index 360136e..1754a3b 100644 --- a/service/src/main/java/bio/terra/common/events/topics/EventTopic.java +++ b/service/src/main/java/bio/terra/common/events/topics/EventTopic.java @@ -1,30 +1,43 @@ package bio.terra.common.events.topics; +import bio.terra.common.events.client.MessageProcessor; import bio.terra.common.events.client.PubsubClient; +import bio.terra.common.events.client.PubsubClientFactory; import bio.terra.common.events.topics.messages.EventMessage; import com.fasterxml.jackson.core.JsonProcessingException; -import java.nio.charset.StandardCharsets; /** This class is responsible interacting with the PubsubClient (both publish and subscribe). */ public abstract class EventTopic { private PubsubClient client; - public EventTopic(PubsubClient client) { - this.client = client; + public EventTopic(PubsubClientFactory clientFactory, String topicName, String serviceName) { + client = clientFactory.createPubsubClient(topicName, serviceName, this::receive); } public void publish(T message) { try { - client.publish(message.toJson().getBytes(StandardCharsets.UTF_8)); + client.publish(message.toJson()); } catch (JsonProcessingException e) { System.out.println("ERROR: unable to publish message"); } } - public void subscribe() { - T message = null; - process(message); + public void subscribe(MessageProcessor processor) { + client.subscribe(processor); + } + + protected boolean receive(String message) { + try { + T msg = (T) EventMessage.fromJson(message); + return process(msg); + } catch (JsonProcessingException e) { + System.out.println(e.getMessage()); + // TODO: what to do with bad messages + } + + // TODO: this needs to be changed back to false + return true; } /** @@ -34,5 +47,5 @@ public void subscribe() { * @param message * @return */ - protected abstract Boolean process(T message); + public abstract boolean process(EventMessage message); } diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java b/service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java new file mode 100644 index 0000000..5f25dd1 --- /dev/null +++ b/service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java @@ -0,0 +1,3 @@ +package bio.terra.common.events.topics.messages; + +public class EventFactory {} diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java b/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java index 2e3f975..4325190 100644 --- a/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java +++ b/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java @@ -19,12 +19,6 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class EventMessage { - public static enum TYPES { - CHART_CREATED, - CHART_UPDATED, - CHART_DELETED - } - /** * This is the current version of the EventMessage schema, as defined in the spec (see class-level * javadoc). @@ -49,7 +43,7 @@ public static enum TYPES { String jobId; @JsonProperty("event_type") - TYPES eventType; + public EventTypes eventType; @JsonProperty("entity_id") String entityId; @@ -95,7 +89,7 @@ protected EventMessage( String publishedBy, Map context, String jobId, - TYPES eventType, + EventTypes eventType, String entityId, String entityUrl, Map properties) { @@ -118,7 +112,7 @@ public EventMessage( String publishedBy, Map context, String jobId, - TYPES eventType, + EventTypes eventType, String entityId, String entityUrl, Map properties) { @@ -135,14 +129,22 @@ public EventMessage( properties); } - public EventMessage(String publishedBy, TYPES eventType, String entityId, String entityUrl) { + public EventMessage(String publishedBy, EventTypes eventType, String entityId, String entityUrl) { this(publishedBy, null, null, eventType, entityId, entityUrl, null); } - public String toJson() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - var jsonMsg = mapper.writeValueAsString(this); - return jsonMsg; + protected EventMessage(EventMessage event) { + this( + event.id, + event.version, + event.publishedAt, + event.publishedBy, + event.context, + event.jobId, + event.eventType, + event.entityId, + event.entityUrl, + event.properties); } public static EventMessage fromJson(String jsonMessage) throws JsonProcessingException { @@ -151,11 +153,17 @@ public static EventMessage fromJson(String jsonMessage) throws JsonProcessingExc return em; } + public String toJson() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + var jsonMsg = mapper.writeValueAsString(this); + return jsonMsg; + } + /** * Ensure required parameters are set on object. * * @return true if object is valid, false otherwise. - * @see #EventMessage(String,String,Date,String,Map,String,TYPES,String,String,Map) + * @see #EventMessage(String, String, Date, String, Map, String, EventTypes, String, String, Map) */ private void confirmNonNull() { @@ -170,4 +178,10 @@ private void confirmNonNull() { version, publishedBy, eventType, entityId, entityUrl)); } } + + public static enum EventTypes { + CHART_CREATED, + CHART_UPDATED, + CHART_DELETED + } } diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java index f2f1d30..d8ea20a 100644 --- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java +++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java @@ -4,6 +4,10 @@ public class ChartCreated extends ChartMessage { public ChartCreated(String publishedBy, String entityId, String entityUrl) { - super(publishedBy, EventMessage.TYPES.CHART_CREATED, entityId, entityUrl); + super(publishedBy, EventTypes.CHART_CREATED, entityId, entityUrl); + } + + public ChartCreated(EventMessage event) { + super(event); } } diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java index 87399a9..b6b48bf 100644 --- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java +++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java @@ -4,6 +4,10 @@ public class ChartDeleted extends ChartMessage { public ChartDeleted(String publishedBy, String entityId, String entityUrl) { - super(publishedBy, EventMessage.TYPES.CHART_DELETED, entityId, entityUrl); + super(publishedBy, EventTypes.CHART_DELETED, entityId, entityUrl); + } + + public ChartDeleted(EventMessage event) { + super(event); } } diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java index d7b7e79..42c9cda 100644 --- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java +++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java @@ -1,10 +1,42 @@ package bio.terra.common.events.topics.messages.charts; import bio.terra.common.events.topics.messages.EventMessage; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.Arrays; +import java.util.List; public abstract class ChartMessage extends EventMessage { - public ChartMessage(String publishedBy, TYPES eventType, String entityId, String entityUrl) { + private static List CHART_TYPES = + Arrays.asList(EventTypes.CHART_CREATED, EventTypes.CHART_UPDATED, EventTypes.CHART_DELETED); + + public ChartMessage(String publishedBy, EventTypes eventType, String entityId, String entityUrl) { super(publishedBy, eventType, entityId, entityUrl); } + + public ChartMessage(EventMessage event) { + super(event); + // if (!CHART_TYPES.contains(event.eventType)) { + // throw new IllegalArgumentException( + // MessageFormat.format( + // "unexpected eventType({1}) while creating Chart event", event.eventType)); + // } + } + + public static ChartMessage fromJson(String json) throws JsonProcessingException { + EventMessage event = EventMessage.fromJson(json); + switch (event.eventType) { + case CHART_CREATED: + return new ChartCreated(event); + case CHART_UPDATED: + return new ChartUpdated(event); + case CHART_DELETED: + return new ChartDeleted(event); + // default: + // throw new IllegalStateException( + // MessageFormat.format("unexpected EventType while processing Chart events: + // {1}", json)); + } + return null; + } } diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java index 7144310..d97d28f 100644 --- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java +++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java @@ -1,7 +1,13 @@ package bio.terra.common.events.topics.messages.charts; +import bio.terra.common.events.topics.messages.EventMessage; + public class ChartUpdated extends ChartMessage { public ChartUpdated(String publishedBy, String entityId, String entityUrl) { - super(publishedBy, TYPES.CHART_UPDATED, entityId, entityUrl); + super(publishedBy, EventTypes.CHART_UPDATED, entityId, entityUrl); + } + + public ChartUpdated(EventMessage event) { + super(event); } } diff --git a/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java b/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java index fb61c0d..653f12d 100644 --- a/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java +++ b/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java @@ -20,7 +20,8 @@ public class EventMessageTest { @Test public void verifyToJson() throws Exception { EventMessage em = - new EventMessage("publisher", EventMessage.TYPES.CHART_CREATED, "entity-id", "entity-url"); + new EventMessage( + "publisher", EventMessage.EventTypes.CHART_CREATED, "entity-id", "entity-url"); ObjectMapper mapper = new ObjectMapper(); validateRequiredAttributes(em, mapper.readTree(em.toJson())); @@ -41,7 +42,8 @@ public void requiredParams_publishedBy() { assertThrows( IllegalArgumentException.class, () -> { - new EventMessage(null, EventMessage.TYPES.CHART_CREATED, "entity-id", "entity-url"); + new EventMessage( + null, EventMessage.EventTypes.CHART_CREATED, "entity-id", "entity-url"); }); assertTrue(exception.getMessage().contains("publishedBy(null)")); } @@ -63,7 +65,8 @@ public void requiredParams_entityId() { assertThrows( IllegalArgumentException.class, () -> { - new EventMessage("publisher", EventMessage.TYPES.CHART_CREATED, null, "entity-url"); + new EventMessage( + "publisher", EventMessage.EventTypes.CHART_CREATED, null, "entity-url"); }); assertTrue(exception.getMessage().contains("entityId(null)")); } @@ -74,7 +77,8 @@ public void requiredParams_entityUrl() { assertThrows( IllegalArgumentException.class, () -> { - new EventMessage("publisher", EventMessage.TYPES.CHART_CREATED, "entity-id", null); + new EventMessage( + "publisher", EventMessage.EventTypes.CHART_CREATED, "entity-id", null); }); assertTrue(exception.getMessage().contains("entityUrl(null)")); }