From 4460c676120546d8bc9ddb7b3a94d65563f90554 Mon Sep 17 00:00:00 2001 From: rishtigupta <127137312+rishtigupta@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:41:37 -0800 Subject: [PATCH] feat: add topic example (#341) --- examples/settings.gradle.kts | 1 + examples/topic/build.gradle.kts | 52 ++++++++ .../momento/client/example/TopicExample.java | 123 ++++++++++++++++++ examples/topic/src/main/resources/logback.xml | 17 +++ .../java/momento/sdk/SubscriptionWrapper.java | 7 +- 5 files changed, 194 insertions(+), 6 deletions(-) create mode 100644 examples/topic/build.gradle.kts create mode 100644 examples/topic/src/main/java/momento/client/example/TopicExample.java create mode 100644 examples/topic/src/main/resources/logback.xml diff --git a/examples/settings.gradle.kts b/examples/settings.gradle.kts index 8417a5dc..803eadb1 100644 --- a/examples/settings.gradle.kts +++ b/examples/settings.gradle.kts @@ -24,3 +24,4 @@ include("cache") include("cache-with-aws") include("lambda:docker") include("token") +include("topic") diff --git a/examples/topic/build.gradle.kts b/examples/topic/build.gradle.kts new file mode 100644 index 00000000..76c18f28 --- /dev/null +++ b/examples/topic/build.gradle.kts @@ -0,0 +1,52 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * This generated file contains a sample Java library project to get you started. + * For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle + * User Manual available at https://docs.gradle.org/7.2/userguide/building_java_projects.html + */ + +plugins { + application + id("com.diffplug.spotless") version "5.15.1" +} + +repositories { + // Use Maven Central for resolving dependencies. + mavenCentral() +} + +dependencies { + implementation("software.momento.java:sdk:1.7.0") + + implementation("com.google.guava:guava:31.1-android") + + // Logging framework to log and enable logging in the Momento client. + implementation("ch.qos.logback:logback-classic:1.4.7") + + // Histogram for collecting stats in the load generator + implementation("org.hdrhistogram:HdrHistogram:2.1.12") + + // Use JUnit Jupiter for testing. + testImplementation("org.junit.jupiter:junit-jupiter:5.9.2") +} + +spotless { + java { + removeUnusedImports() + googleJavaFormat("1.11.0") + } +} + +tasks.test { + // Use JUnit Platform for unit tests. + useJUnitPlatform() +} + +task("topic", JavaExec::class) { + description = "Run the disposable token example" + classpath = sourceSets.main.get().runtimeClasspath + mainClass.set("momento.client.example.TopicExample") +} + +task("prepareKotlinBuildScriptModel") {} diff --git a/examples/topic/src/main/java/momento/client/example/TopicExample.java b/examples/topic/src/main/java/momento/client/example/TopicExample.java new file mode 100644 index 00000000..1a6ce108 --- /dev/null +++ b/examples/topic/src/main/java/momento/client/example/TopicExample.java @@ -0,0 +1,123 @@ +package momento.client.example; + +import java.time.Duration; +import momento.sdk.CacheClient; +import momento.sdk.ISubscriptionCallbacks; +import momento.sdk.TopicClient; +import momento.sdk.auth.CredentialProvider; +import momento.sdk.auth.EnvVarCredentialProvider; +import momento.sdk.config.Configurations; +import momento.sdk.config.TopicConfigurations; +import momento.sdk.exceptions.AlreadyExistsException; +import momento.sdk.exceptions.SdkException; +import momento.sdk.responses.cache.control.CacheCreateResponse; +import momento.sdk.responses.topic.TopicMessage; +import momento.sdk.responses.topic.TopicPublishResponse; +import momento.sdk.responses.topic.TopicSubscribeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicExample { + + private static final String API_KEY_ENV_VAR = "MOMENTO_API_KEY"; + private static final Duration DEFAULT_ITEM_TTL = Duration.ofSeconds(60); + + private static final String CACHE_NAME = "topic-example-cache"; + private static final String TOPIC_NAME = "example-topic"; + + private static final Logger logger = LoggerFactory.getLogger(TopicExample.class); + + public static void main(String[] args) { + logStartBanner(); + + final CredentialProvider credentialProvider; + try { + credentialProvider = new EnvVarCredentialProvider(API_KEY_ENV_VAR); + } catch (SdkException e) { + logger.error("Unable to load credential from environment variable " + API_KEY_ENV_VAR, e); + throw e; + } + + try (final CacheClient client = + CacheClient.create( + credentialProvider, Configurations.Laptop.latest(), DEFAULT_ITEM_TTL); + final TopicClient topicClient = + TopicClient.create(credentialProvider, TopicConfigurations.Laptop.latest())) { + + // Create a cache + final CacheCreateResponse createResponse = client.createCache(CACHE_NAME).join(); + if (createResponse instanceof CacheCreateResponse.Error error) { + if (error.getCause() instanceof AlreadyExistsException) { + logger.info("Cache with name '{}' already exists.", CACHE_NAME); + } else { + logger.error("Cache creation failed with error " + error.getErrorCode(), error); + } + } + + // Subscribe to a topic + TopicSubscribeResponse.Subscription subscription = subscribeToTopic(topicClient); + + // Publish messages to the topic + for (int i = 0; i < 100; i++) { + publishToTopic(topicClient, "message " + i); + Thread.sleep(1000); + } + + subscription.unsubscribe(); + + } catch (Exception e) { + logger.error("An unexpected error occurred", e); + throw new RuntimeException(e); + } + + logEndBanner(); + } + + private static TopicSubscribeResponse.Subscription subscribeToTopic(TopicClient topicClient) { + final TopicSubscribeResponse subscribeResponse = + topicClient + .subscribe( + TopicExample.CACHE_NAME, + TOPIC_NAME, + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) { + logger.info("Received message on topic {}: {}", TOPIC_NAME, message.toString()); + } + + @Override + public void onError(Throwable error) { + logger.error("Subscription to topic {} failed with error", TOPIC_NAME, error); + } + + @Override + public void onCompleted() { + logger.info("Subscription to topic {} completed", TOPIC_NAME); + } + }) + .join(); + return subscribeResponse.orElseThrow( + () -> new RuntimeException("Unable to subscribe to topic " + TOPIC_NAME)); + } + + private static void publishToTopic(TopicClient topicClient, String message) { + final TopicPublishResponse publishResponse = + topicClient.publish(TopicExample.CACHE_NAME, TOPIC_NAME, message).join(); + if (publishResponse instanceof TopicPublishResponse.Error error) { + logger.error( + "Topic {} publish failed with error {}", TOPIC_NAME, error.getErrorCode(), error); + } + } + + private static void logStartBanner() { + logger.info("******************************************************************"); + logger.info("Example Start"); + logger.info("******************************************************************"); + } + + private static void logEndBanner() { + logger.info("******************************************************************"); + logger.info("Example End"); + logger.info("******************************************************************"); + } +} diff --git a/examples/topic/src/main/resources/logback.xml b/examples/topic/src/main/resources/logback.xml new file mode 100644 index 00000000..29d89338 --- /dev/null +++ b/examples/topic/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n + + + + + + + diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index 98b0a5e1..933ed6d4 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -201,11 +201,6 @@ public void close() { if (subscription != null) { subscription.onCompleted(); } - if (scheduler != null) { - scheduler.shutdown(); - } - if (grpcManager != null) { - grpcManager.close(); - } + scheduler.shutdown(); } }