diff --git a/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java b/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java
index 293dbc8..c366444 100644
--- a/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java
+++ b/service/src/main/java/bio/terra/appmanager/events/ChartEvents.java
@@ -3,11 +3,20 @@
import bio.terra.common.events.client.PubsubClientFactory;
import bio.terra.common.events.config.PubsubConfig;
import bio.terra.common.events.topics.ChartTopic;
+import bio.terra.common.events.topics.messages.EventMessage;
import org.springframework.stereotype.Repository;
@Repository
public class ChartEvents extends ChartTopic {
public ChartEvents(PubsubConfig config, PubsubClientFactory factory) {
super(config, factory);
+ // TODO: default to this::receive if you don't provide one
+ subscribe(this::receive);
+ }
+
+ @Override
+ public boolean process(EventMessage message) {
+ System.out.println("Received message: " + message.eventType);
+ return true;
}
}
diff --git a/service/src/main/java/bio/terra/common/events/client/MessageProcessor.java b/service/src/main/java/bio/terra/common/events/client/MessageProcessor.java
new file mode 100644
index 0000000..dc4f8fc
--- /dev/null
+++ b/service/src/main/java/bio/terra/common/events/client/MessageProcessor.java
@@ -0,0 +1,6 @@
+package bio.terra.common.events.client;
+
+@FunctionalInterface
+public interface MessageProcessor {
+ boolean process(String jsonString);
+}
diff --git a/service/src/main/java/bio/terra/common/events/client/PubsubClient.java b/service/src/main/java/bio/terra/common/events/client/PubsubClient.java
index aaa1450..f0f86e3 100644
--- a/service/src/main/java/bio/terra/common/events/client/PubsubClient.java
+++ b/service/src/main/java/bio/terra/common/events/client/PubsubClient.java
@@ -3,13 +3,20 @@
import java.io.Closeable;
/**
- * The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure.
+ * The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure
+ * for a single topic.
*
*
To create an instance of this class, please see {@link PubsubClientFactory}
+ *
+ *
The PubsubClient is responsible for ensuring the following conditions:
+ *
+ *
*/
public abstract class PubsubClient implements Closeable {
- public abstract void publish(byte[] message);
+ public abstract void publish(String message);
- public abstract void subscribe();
+ public abstract void subscribe(MessageProcessor process);
}
diff --git a/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java b/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java
index 2d37053..213e70b 100644
--- a/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java
+++ b/service/src/main/java/bio/terra/common/events/client/PubsubClientFactory.java
@@ -2,6 +2,7 @@
import bio.terra.common.events.client.google.GooglePubsubClient;
import bio.terra.common.events.config.PubsubConfig;
+import bio.terra.common.events.topics.EventSubscriber;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,15 +30,17 @@ public PubsubClientFactory(PubsubConfig config) {
this.pubsubConfig = config;
}
- public PubsubClient createPubsubClient(String topicName) {
+ public PubsubClient createPubsubClient(
+ String topicName, String serviceName, EventSubscriber subscriber) {
return new GooglePubsubClient(
pubsubConfig.googleConfig().projectId(),
- formatTopicName(topicName),
+ formatTopicId(topicName),
+ formatSubscriptionId(serviceName, topicName),
pubsubConfig.createTopic());
}
- private String formatTopicName(String topic) {
- List parts = new ArrayList<>(Arrays.asList("event", topic));
+ private String formatTopicId(String topicName) {
+ List parts = new ArrayList<>(Arrays.asList("event", topicName));
if (pubsubConfig.nameSuffix() != null) {
parts.add(pubsubConfig.nameSuffix());
@@ -45,4 +48,14 @@ private String formatTopicName(String topic) {
return String.join("-", parts);
}
+
+ private String formatSubscriptionId(String serviceName, String topicName) {
+ if (serviceName == null) {
+ return null;
+ }
+
+ List parts = new ArrayList<>(Arrays.asList("subscription", serviceName, topicName));
+
+ return String.join("-", parts);
+ }
}
diff --git a/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java b/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java
index 4220e17..a0408e2 100644
--- a/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java
+++ b/service/src/main/java/bio/terra/common/events/client/google/EventTopicMustBeAlreadyCreated.java
@@ -20,7 +20,7 @@ public EventTopicMustBeAlreadyCreated(String projectId) {
/**
* This is called when running in the Production environment Verify the topic exists or generate a
- * ConfigurationError # Then return the TopicName
+ * ConfigurationError then return the TopicName
*
* @param name
* @return TopicName for the Event topic for Production
diff --git a/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java b/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java
index c6e039e..bc4951d 100644
--- a/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java
+++ b/service/src/main/java/bio/terra/common/events/client/google/GooglePubsubClient.java
@@ -1,10 +1,22 @@
package bio.terra.common.events.client.google;
+import bio.terra.common.events.client.MessageProcessor;
import bio.terra.common.events.client.PubsubClient;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.naming.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,32 +30,67 @@ public class GooglePubsubClient extends PubsubClient {
private static final Logger logger = LoggerFactory.getLogger(GooglePubsubClient.class);
private String projectId;
- private String topicName;
+ private String topicId;
+ private String subscriptionId;
private Publisher publisher;
+ private Subscriber subscriber;
- public GooglePubsubClient(String projectId, String topicName, boolean createTopic) {
+ public GooglePubsubClient(
+ String projectId, String topicId, String subscriptionId, boolean createTopic) {
this.projectId = projectId;
- this.topicName = topicName;
- publisher = buildPublisher(projectId, topicName, createTopic);
+ this.topicId = topicId;
+ this.subscriptionId = subscriptionId;
+ publisher = buildPublisher(projectId, topicId, createTopic);
}
@Override
- public void publish(byte[] message) {
+ public void publish(String message) {
if (logger.isDebugEnabled()) {
- logger.debug(new String(message, StandardCharsets.UTF_8));
+ logger.debug(message);
}
// TODO: remove me
- logger.info(new String(message, StandardCharsets.UTF_8));
+ logger.info(message);
+ ByteString data = ByteString.copyFromUtf8(message);
+ PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
+
+ ApiFuture future = publisher.publish(pubsubMessage);
+ ApiFutures.addCallback(future, makePublishCallback(message), MoreExecutors.directExecutor());
}
@Override
- public void subscribe() {}
+ public void subscribe(MessageProcessor processor) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+
+ // Instantiate an asynchronous message receiver.
+ MessageReceiver receiver =
+ (PubsubMessage message, AckReplyConsumer consumer) -> {
+ // Handle incoming message, then ack the received message.
+ String eventMsg = message.getData().toStringUtf8();
+ System.out.println("Id: " + message.getMessageId());
+ System.out.println("Data: " + message.getData().toStringUtf8());
+ if (processor.process(eventMsg)) {
+ consumer.ack();
+ } else {
+ consumer.nack();
+ }
+ };
+
+ subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
+ // Start the subscriber.
+ subscriber.startAsync().awaitRunning();
+ System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
+ }
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ closePublisher();
+ closeSubscriber();
+ }
private Publisher buildPublisher(String projectId, String topicName, boolean createTopic) {
try {
+ logger.info("Building events publisher: " + projectId + ":" + topicName);
TopicName topic = verifyTopic(projectId, topicName, createTopic);
return Publisher.newBuilder(topic).build();
} catch (IOException | ConfigurationException e) {
@@ -51,6 +98,30 @@ private Publisher buildPublisher(String projectId, String topicName, boolean cre
}
}
+ private void closePublisher() {
+ if (publisher != null) {
+ logger.info("Stopping events publisher: " + projectId + ":" + topicId);
+ publisher.shutdown();
+ try {
+ publisher.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void closeSubscriber() {
+ if (subscriber != null) {
+ // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
+ subscriber.startAsync();
+ try {
+ subscriber.awaitTerminated(1, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private TopicName verifyTopic(String projectId, String topicName, boolean createTopic)
throws IOException, ConfigurationException {
EventTopicName topicCreator = null;
@@ -61,4 +132,19 @@ private TopicName verifyTopic(String projectId, String topicName, boolean create
}
return topicCreator.verifyTopicName(topicName);
}
+
+ private ApiFutureCallback makePublishCallback(String message) {
+ return new ApiFutureCallback<>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("Error publishing message : " + message, throwable);
+ }
+
+ @Override
+ public void onSuccess(String messageId) {
+ // Once published, returns server-assigned message ids (unique within the topic)
+ logger.info("Published message ID: " + messageId);
+ }
+ };
+ }
}
diff --git a/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java b/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java
index eef5df0..c9f5f48 100644
--- a/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java
+++ b/service/src/main/java/bio/terra/common/events/client/google/PublisherDao.java
@@ -8,7 +8,6 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +55,5 @@ public void onSuccess(String messageId) {
}
@Override
- public void close() {
- if (publisher != null) {
- publisher.shutdown();
- try {
- publisher.awaitTermination(1, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
+ public void close() {}
}
diff --git a/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java b/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java
index d9eb4a6..75c1257 100644
--- a/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java
+++ b/service/src/main/java/bio/terra/common/events/config/PubsubConfig.java
@@ -37,7 +37,7 @@ public GoogleConfig googleConfig() {
}
public String nameSuffix() {
- if (beeConfig != null) {
+ if (beeConfig != null && beeConfig.isActive()) {
return beeConfig.name();
}
return null;
diff --git a/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java b/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java
index 9ae794f..e6f47e8 100644
--- a/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java
+++ b/service/src/main/java/bio/terra/common/events/topics/ChartTopic.java
@@ -7,16 +7,14 @@
import bio.terra.common.events.topics.messages.charts.ChartMessage;
import bio.terra.common.events.topics.messages.charts.ChartUpdated;
import org.jetbrains.annotations.NotNull;
-import org.springframework.stereotype.Repository;
import org.springframework.web.util.UriComponentsBuilder;
-@Repository
-public class ChartTopic extends EventTopic {
+public abstract class ChartTopic extends EventTopic {
- private String publishedBy;
+ protected String publishedBy;
public ChartTopic(PubsubConfig config, PubsubClientFactory clientFactory) {
- super(clientFactory.createPubsubClient("charts"));
+ super(clientFactory, "charts", config.publishedBy());
publishedBy = config.publishedBy();
}
@@ -28,11 +26,6 @@ private static String buildEntityUrl(String entityId) {
.toUriString();
}
- @Override
- protected Boolean process(ChartMessage message) {
- return Boolean.TRUE;
- }
-
public void chartCreated(String entityId) {
publish(new ChartCreated(publishedBy, entityId, buildEntityUrl(entityId)));
}
diff --git a/service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java b/service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java
new file mode 100644
index 0000000..d32f809
--- /dev/null
+++ b/service/src/main/java/bio/terra/common/events/topics/EventSubscriber.java
@@ -0,0 +1,8 @@
+package bio.terra.common.events.topics;
+
+import bio.terra.common.events.topics.messages.EventMessage;
+
+@FunctionalInterface
+public interface EventSubscriber {
+ boolean processEvent(String jsonString);
+}
diff --git a/service/src/main/java/bio/terra/common/events/topics/EventTopic.java b/service/src/main/java/bio/terra/common/events/topics/EventTopic.java
index 360136e..1754a3b 100644
--- a/service/src/main/java/bio/terra/common/events/topics/EventTopic.java
+++ b/service/src/main/java/bio/terra/common/events/topics/EventTopic.java
@@ -1,30 +1,43 @@
package bio.terra.common.events.topics;
+import bio.terra.common.events.client.MessageProcessor;
import bio.terra.common.events.client.PubsubClient;
+import bio.terra.common.events.client.PubsubClientFactory;
import bio.terra.common.events.topics.messages.EventMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
-import java.nio.charset.StandardCharsets;
/** This class is responsible interacting with the PubsubClient (both publish and subscribe). */
public abstract class EventTopic {
private PubsubClient client;
- public EventTopic(PubsubClient client) {
- this.client = client;
+ public EventTopic(PubsubClientFactory clientFactory, String topicName, String serviceName) {
+ client = clientFactory.createPubsubClient(topicName, serviceName, this::receive);
}
public void publish(T message) {
try {
- client.publish(message.toJson().getBytes(StandardCharsets.UTF_8));
+ client.publish(message.toJson());
} catch (JsonProcessingException e) {
System.out.println("ERROR: unable to publish message");
}
}
- public void subscribe() {
- T message = null;
- process(message);
+ public void subscribe(MessageProcessor processor) {
+ client.subscribe(processor);
+ }
+
+ protected boolean receive(String message) {
+ try {
+ T msg = (T) EventMessage.fromJson(message);
+ return process(msg);
+ } catch (JsonProcessingException e) {
+ System.out.println(e.getMessage());
+ // TODO: what to do with bad messages
+ }
+
+ // TODO: this needs to be changed back to false
+ return true;
}
/**
@@ -34,5 +47,5 @@ public void subscribe() {
* @param message
* @return
*/
- protected abstract Boolean process(T message);
+ public abstract boolean process(EventMessage message);
}
diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java b/service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java
new file mode 100644
index 0000000..5f25dd1
--- /dev/null
+++ b/service/src/main/java/bio/terra/common/events/topics/messages/EventFactory.java
@@ -0,0 +1,3 @@
+package bio.terra.common.events.topics.messages;
+
+public class EventFactory {}
diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java b/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java
index 2e3f975..4325190 100644
--- a/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java
+++ b/service/src/main/java/bio/terra/common/events/topics/messages/EventMessage.java
@@ -19,12 +19,6 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EventMessage {
- public static enum TYPES {
- CHART_CREATED,
- CHART_UPDATED,
- CHART_DELETED
- }
-
/**
* This is the current version of the EventMessage schema, as defined in the spec (see class-level
* javadoc).
@@ -49,7 +43,7 @@ public static enum TYPES {
String jobId;
@JsonProperty("event_type")
- TYPES eventType;
+ public EventTypes eventType;
@JsonProperty("entity_id")
String entityId;
@@ -95,7 +89,7 @@ protected EventMessage(
String publishedBy,
Map context,
String jobId,
- TYPES eventType,
+ EventTypes eventType,
String entityId,
String entityUrl,
Map properties) {
@@ -118,7 +112,7 @@ public EventMessage(
String publishedBy,
Map context,
String jobId,
- TYPES eventType,
+ EventTypes eventType,
String entityId,
String entityUrl,
Map properties) {
@@ -135,14 +129,22 @@ public EventMessage(
properties);
}
- public EventMessage(String publishedBy, TYPES eventType, String entityId, String entityUrl) {
+ public EventMessage(String publishedBy, EventTypes eventType, String entityId, String entityUrl) {
this(publishedBy, null, null, eventType, entityId, entityUrl, null);
}
- public String toJson() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
- var jsonMsg = mapper.writeValueAsString(this);
- return jsonMsg;
+ protected EventMessage(EventMessage event) {
+ this(
+ event.id,
+ event.version,
+ event.publishedAt,
+ event.publishedBy,
+ event.context,
+ event.jobId,
+ event.eventType,
+ event.entityId,
+ event.entityUrl,
+ event.properties);
}
public static EventMessage fromJson(String jsonMessage) throws JsonProcessingException {
@@ -151,11 +153,17 @@ public static EventMessage fromJson(String jsonMessage) throws JsonProcessingExc
return em;
}
+ public String toJson() throws JsonProcessingException {
+ ObjectMapper mapper = new ObjectMapper();
+ var jsonMsg = mapper.writeValueAsString(this);
+ return jsonMsg;
+ }
+
/**
* Ensure required parameters are set on object.
*
* @return true if object is valid, false otherwise.
- * @see #EventMessage(String,String,Date,String,Map,String,TYPES,String,String,Map)
+ * @see #EventMessage(String, String, Date, String, Map, String, EventTypes, String, String, Map)
*/
private void confirmNonNull() {
@@ -170,4 +178,10 @@ private void confirmNonNull() {
version, publishedBy, eventType, entityId, entityUrl));
}
}
+
+ public static enum EventTypes {
+ CHART_CREATED,
+ CHART_UPDATED,
+ CHART_DELETED
+ }
}
diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java
index f2f1d30..d8ea20a 100644
--- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java
+++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartCreated.java
@@ -4,6 +4,10 @@
public class ChartCreated extends ChartMessage {
public ChartCreated(String publishedBy, String entityId, String entityUrl) {
- super(publishedBy, EventMessage.TYPES.CHART_CREATED, entityId, entityUrl);
+ super(publishedBy, EventTypes.CHART_CREATED, entityId, entityUrl);
+ }
+
+ public ChartCreated(EventMessage event) {
+ super(event);
}
}
diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java
index 87399a9..b6b48bf 100644
--- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java
+++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartDeleted.java
@@ -4,6 +4,10 @@
public class ChartDeleted extends ChartMessage {
public ChartDeleted(String publishedBy, String entityId, String entityUrl) {
- super(publishedBy, EventMessage.TYPES.CHART_DELETED, entityId, entityUrl);
+ super(publishedBy, EventTypes.CHART_DELETED, entityId, entityUrl);
+ }
+
+ public ChartDeleted(EventMessage event) {
+ super(event);
}
}
diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java
index d7b7e79..42c9cda 100644
--- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java
+++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartMessage.java
@@ -1,10 +1,42 @@
package bio.terra.common.events.topics.messages.charts;
import bio.terra.common.events.topics.messages.EventMessage;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.Arrays;
+import java.util.List;
public abstract class ChartMessage extends EventMessage {
- public ChartMessage(String publishedBy, TYPES eventType, String entityId, String entityUrl) {
+ private static List CHART_TYPES =
+ Arrays.asList(EventTypes.CHART_CREATED, EventTypes.CHART_UPDATED, EventTypes.CHART_DELETED);
+
+ public ChartMessage(String publishedBy, EventTypes eventType, String entityId, String entityUrl) {
super(publishedBy, eventType, entityId, entityUrl);
}
+
+ public ChartMessage(EventMessage event) {
+ super(event);
+ // if (!CHART_TYPES.contains(event.eventType)) {
+ // throw new IllegalArgumentException(
+ // MessageFormat.format(
+ // "unexpected eventType({1}) while creating Chart event", event.eventType));
+ // }
+ }
+
+ public static ChartMessage fromJson(String json) throws JsonProcessingException {
+ EventMessage event = EventMessage.fromJson(json);
+ switch (event.eventType) {
+ case CHART_CREATED:
+ return new ChartCreated(event);
+ case CHART_UPDATED:
+ return new ChartUpdated(event);
+ case CHART_DELETED:
+ return new ChartDeleted(event);
+ // default:
+ // throw new IllegalStateException(
+ // MessageFormat.format("unexpected EventType while processing Chart events:
+ // {1}", json));
+ }
+ return null;
+ }
}
diff --git a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java
index 7144310..d97d28f 100644
--- a/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java
+++ b/service/src/main/java/bio/terra/common/events/topics/messages/charts/ChartUpdated.java
@@ -1,7 +1,13 @@
package bio.terra.common.events.topics.messages.charts;
+import bio.terra.common.events.topics.messages.EventMessage;
+
public class ChartUpdated extends ChartMessage {
public ChartUpdated(String publishedBy, String entityId, String entityUrl) {
- super(publishedBy, TYPES.CHART_UPDATED, entityId, entityUrl);
+ super(publishedBy, EventTypes.CHART_UPDATED, entityId, entityUrl);
+ }
+
+ public ChartUpdated(EventMessage event) {
+ super(event);
}
}
diff --git a/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java b/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java
index fb61c0d..653f12d 100644
--- a/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java
+++ b/service/src/test/java/bio/terra/common/events/topics/messages/EventMessageTest.java
@@ -20,7 +20,8 @@ public class EventMessageTest {
@Test
public void verifyToJson() throws Exception {
EventMessage em =
- new EventMessage("publisher", EventMessage.TYPES.CHART_CREATED, "entity-id", "entity-url");
+ new EventMessage(
+ "publisher", EventMessage.EventTypes.CHART_CREATED, "entity-id", "entity-url");
ObjectMapper mapper = new ObjectMapper();
validateRequiredAttributes(em, mapper.readTree(em.toJson()));
@@ -41,7 +42,8 @@ public void requiredParams_publishedBy() {
assertThrows(
IllegalArgumentException.class,
() -> {
- new EventMessage(null, EventMessage.TYPES.CHART_CREATED, "entity-id", "entity-url");
+ new EventMessage(
+ null, EventMessage.EventTypes.CHART_CREATED, "entity-id", "entity-url");
});
assertTrue(exception.getMessage().contains("publishedBy(null)"));
}
@@ -63,7 +65,8 @@ public void requiredParams_entityId() {
assertThrows(
IllegalArgumentException.class,
() -> {
- new EventMessage("publisher", EventMessage.TYPES.CHART_CREATED, null, "entity-url");
+ new EventMessage(
+ "publisher", EventMessage.EventTypes.CHART_CREATED, null, "entity-url");
});
assertTrue(exception.getMessage().contains("entityId(null)"));
}
@@ -74,7 +77,8 @@ public void requiredParams_entityUrl() {
assertThrows(
IllegalArgumentException.class,
() -> {
- new EventMessage("publisher", EventMessage.TYPES.CHART_CREATED, "entity-id", null);
+ new EventMessage(
+ "publisher", EventMessage.EventTypes.CHART_CREATED, "entity-id", null);
});
assertTrue(exception.getMessage().contains("entityUrl(null)"));
}