From 39c4d05fec2d3fb38ce5cb80bd32f67c4975f530 Mon Sep 17 00:00:00 2001 From: Steven Hartley Date: Tue, 16 Jul 2024 17:14:20 -0400 Subject: [PATCH] =?UTF-8?q?Move=20InMemorySubscriber=20from=20L2=20communi?= =?UTF-8?q?cation=20module=20to=20=20L3=20client=20mo=E2=80=A6=20(#158)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The InMemorySubscriber actually was implementing the client-side of the uSubscription flow by talking to usubscription service, registering a listener (to receive published messages), and setting up a notifier to receive subscription changes. The only L2 item for pub/sub (subscriber) flow was to register a listener with the transport so there is no need to add a wrapper to do that. This change then also implements all the other remaining usubscription client side APIs for developers to use and removes the L2 Subscriber to avoid confusion with this InMemoryUSubscriptionClient implementation. The uProtocol client-side implementations will now reside in the client folder of up-java (ex. uDiscovery & uTwin). --- README.adoc | 33 +- pom.xml | 3 +- .../org/eclipse/uprotocol/client/README.adoc | 17 + .../v3/InMemoryUSubscriptionClient.java} | 193 ++++++- .../v3}/SubscriptionChangeHandler.java | 2 +- .../usubscription/v3/USubscriptionClient.java | 263 +++++++++ .../uprotocol/communication/Subscriber.java | 123 ---- .../uprotocol/communication/UClient.java | 24 +- .../uprotocol/transport/UTransport.java | 5 +- .../v3/InMemoryUSubscriptionClientTest.java} | 544 +++++++++++++++++- .../uprotocol/communication/UClientTest.java | 15 - 11 files changed, 1005 insertions(+), 217 deletions(-) create mode 100644 src/main/java/org/eclipse/uprotocol/client/README.adoc rename src/main/java/org/eclipse/uprotocol/{communication/InMemorySubscriber.java => client/usubscription/v3/InMemoryUSubscriptionClient.java} (54%) rename src/main/java/org/eclipse/uprotocol/{communication => client/usubscription/v3}/SubscriptionChangeHandler.java (95%) create mode 100644 src/main/java/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.java delete mode 100644 src/main/java/org/eclipse/uprotocol/communication/Subscriber.java rename src/test/java/org/eclipse/uprotocol/{communication/InMemorySubscriberTest.java => client/usubscription/v3/InMemoryUSubscriptionClientTest.java} (58%) diff --git a/README.adoc b/README.adoc index fcee0e7e..46ffbac7 100644 --- a/README.adoc +++ b/README.adoc @@ -3,7 +3,9 @@ == Overview -This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Java defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it. +This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Java defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below and organized by the layers of the protocol. + +Each package contains a README.adoc file that describes the purpose of the package and how to use it. The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in up-core-api yet. @@ -25,26 +27,35 @@ To pull the Library from maven central, setting ${uprotocol.version} to the late === Using The Library .SDK Packages -[#sdk-packages,width=100%,cols="20%,80%",options="header"] +[#sdk-packages,width=100%,cols="1,2,5",options="header"] |=== -| Package | Purpose +| Package | Protocol Layer | Purpose + +| xref:src/main/java/org/eclipse/uprotocol/communication/README.adoc[`*client*`] +| https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3[Application Layer (uP-L3)] +| Top level client-facing interfaces to communication with USubscription, UDiscovery, and UTwin services. | xref:src/main/java/org/eclipse/uprotocol/communication/README.adoc[`*communication*`] -| Top level client-facing https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l2[communication layer (uP-L2)] interfaces that applications and services use to implement the publisher/subscriber, notification, and RPC patterns on top of the transport layer (uTransport) API. +| https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l2[communication layer (uP-L2)] +| Common implementation of communication messaging patterns (publisher, subscriber, Rpcclient, RpcServer, etc..) tha t is build on top of the L1 transport interface (see below) -| link:src/main/java/org/eclipse/uprotocol/uri/README.adoc[`*uuri*`] -| Builders, validators, and serializers for uProtocol addressing scheme (UUri). +| link:src/main/java/org/eclipse/uprotocol/transport/README.adoc[`*transport*`] +| https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc[Transport Layer (uP-L1)] +| Interface and data model for how to send() and receive() messages in a common way across various transport technologies (ex. zenoh, mqtt, http, etc...). the interface is implemented by transports (ex. up-transport-android-java), and the interface is then used to build the uProtocol layer 2 communication layer implementation. +| link:src/main/java/org/eclipse/uprotocol/uri/README.adoc[`*uuri*`] +| Basics +| uProtocol addressing scheme (UUri) builders, validators, and serializers. -| link:src/main/java/org/eclipse/uprotocol/uuid/README.adoc[`*uuid*`] -| Identifier used to uniquely identify (and timestamp) messages that are sent -| link:src/main/java/org/eclipse/uprotocol/transport/README.adoc[`*transport*`] -| https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc[uP-L1 Transport Layer] interface and data model that wraps communication middlewares like zenoh, mqtt, http, etc... into a thin and simple to use transport interface. This model is used by the communication layer (uP-L2) to send and receive messages and transports are expected to implement the link:src/main/java/org/eclipse/uprotocol/transport/UTransport.java[uTransport] interface. +| link:src/main/java/org/eclipse/uprotocol/uuid/README.adoc[`*uuid*`] +| Basics +| uProtocol unique identifier builders, validators, and serializers. | link:src/main/java/org/eclipse/uprotocol/cloudevent/README.adoc[`*cloudevent*`] -| A representation of uProtocol messages used in older versions of the specifications and kept for backwards compatibility. +| Legacy +| A representation of uProtocol messages used in older versions of the specifications kept for backwards compatibility. |=== diff --git a/pom.xml b/pom.xml index 461fbc0a..95d06069 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ up-java Java Library for uProtocol Language specific uProtocol library for building and using UUri, UUID, UAttributes, UTransport, and more - 0.1.12-SNAPSHOT + 0.2.0-SNAPSHOT jar https://github.com/eclipse-uprotocol/up-java/ @@ -241,6 +241,7 @@ **/uri/** **/uuid/** **/validation/** + **/client/** diff --git a/src/main/java/org/eclipse/uprotocol/client/README.adoc b/src/main/java/org/eclipse/uprotocol/client/README.adoc new file mode 100644 index 00000000..7ee6dd1a --- /dev/null +++ b/src/main/java/org/eclipse/uprotocol/client/README.adoc @@ -0,0 +1,17 @@ +# Application Layer APIs (uP-L3 Interface) + +The following module includes the client-facing https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3[Application Layer (uP-L3)] interfaces to communication with USubscription, UDiscovery, and UTwin services. + + +## uP-L3 Interfaces + +.Interfaces (uP-L3 Interface) +[cols="1,1,3",options="header"] +|=== +| Interface | Implementation(s) | Description + +| xref:usubscription/v3/USubscriptionClient.java[*USubscriptionClient*] | xref:usubscription/v3/InMemoryUSubscriptionClient.java[InMemoryUSubscriptionClient] | Subscription Management APIs to subscribe(), unsubscribe() and fetch information from the subscription database. +|=== + + +The module includes the interface for the client-facing APIs as well as a simple in-memory implementation that is based on the uP-L2 in-memory implementations. the term in-memory is used to indicate that the data required by the code is cached inside of the object and not persisted to a given database backend, this design is useful for embedded applications (i.e. in the vehicle) however will not scale to the multi-tenanted cloud applications. \ No newline at end of file diff --git a/src/main/java/org/eclipse/uprotocol/communication/InMemorySubscriber.java b/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.java similarity index 54% rename from src/main/java/org/eclipse/uprotocol/communication/InMemorySubscriber.java rename to src/main/java/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.java index 96774e8a..819e7571 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/InMemorySubscriber.java +++ b/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.eclipse.uprotocol.communication; +package org.eclipse.uprotocol.client.usubscription.v3; import java.util.Objects; import java.util.Optional; @@ -19,6 +19,21 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; +import org.eclipse.uprotocol.communication.CallOptions; +import org.eclipse.uprotocol.communication.InMemoryRpcClient; +import org.eclipse.uprotocol.communication.Notifier; +import org.eclipse.uprotocol.communication.RpcClient; +import org.eclipse.uprotocol.communication.RpcMapper; +import org.eclipse.uprotocol.communication.SimpleNotifier; +import org.eclipse.uprotocol.communication.UPayload; +import org.eclipse.uprotocol.communication.UStatusException; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscribersRequest; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscribersResponse; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsRequest; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsResponse; +import org.eclipse.uprotocol.core.usubscription.v3.NotificationsRequest; +import org.eclipse.uprotocol.core.usubscription.v3.NotificationsResponse; +import org.eclipse.uprotocol.core.usubscription.v3.SubscribeAttributes; import org.eclipse.uprotocol.core.usubscription.v3.SubscriberInfo; import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionRequest; import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionResponse; @@ -39,42 +54,56 @@ import com.google.protobuf.Descriptors.ServiceDescriptor; /** - * The following is an in-memory implementation of the {@link Subscriber} interface that - * wraps the {@link UTransport} for implementing the Subscriber-side of the pub/sub - * messaging pattern to allow developers to subscribe and unsubscribe to topics. This - * implementation uses the {@link InMemoryRpcClient} and {@link SimpleNotifier} interfaces - * to invoke the subscription request message to the usubscription service, and register - * to receive notifications for changes from the uSubscription service. - * + * Implementation of USubscriptionClient that caches state information within the object + * and used for single tenant applications (ex. in-vehicle). The implementation uses {@link InMemoryRpcClient} + * that also stores RPC corelation information within the objects */ -public class InMemorySubscriber implements Subscriber { - +public class InMemoryUSubscriptionClient implements USubscriptionClient { private final UTransport transport; private final RpcClient rpcClient; private final Notifier notifier; private static final ServiceDescriptor USUBSCRIPTION = USubscriptionProto.getDescriptor().getServices().get(0); - // TODO: The following items need to be pulled from generated code + // TODO: The following items eventually need to be pulled from generated code private static final UUri SUBSCRIBE_METHOD = UriFactory.fromProto(USUBSCRIPTION, 1); private static final UUri UNSUBSCRIBE_METHOD = UriFactory.fromProto(USUBSCRIPTION, 2); - private static final UUri NOTIFICATION_TOPIC = UriFactory.fromProto(USUBSCRIPTION, 0x8000); + private static final UUri FETCH_SUBSCRIBERS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 8); + private static final UUri FETCH_SUBSCRIPTIONS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 3); + private static final UUri REGISTER_NOTIFICATIONS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 6); + private static final UUri UNREGISTER_NOTIFICATIONS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 7); + + private static final UUri NOTIFICATION_TOPIC = UriFactory.fromProto(USUBSCRIPTION, 0x8000); + // Map to store subscription change notification handlers private final ConcurrentHashMap mHandlers = new ConcurrentHashMap<>(); // transport Notification listener that will process subscription change notifications private final UListener mNotificationListener = this::handleNotifications; - + /** - * Creates a new subscriber for existing Communication Layer client implementations. + * Creates a new USubscription client passing {@link UTransport} and {@link CallOptions} + * used to provide additional options for the RPC requests to uSubscription service. + * + * @param transport the transport to use for sending the notifications + * @param options the call options to use for the RPC requests + */ + public InMemoryUSubscriptionClient (UTransport transport) { + this(transport, new InMemoryRpcClient(transport), new SimpleNotifier(transport)); + } + + + /** + * Creates a new USubscription client passing {@link UTransport}, {@link CallOptions}, + * and an implementation of {@link RpcClient} and {@link Notifier}. * * @param transport the transport to use for sending the notifications * @param rpcClient the rpc client to use for sending the RPC requests * @param notifier the notifier to use for registering the notification listener */ - public InMemorySubscriber (UTransport transport, RpcClient rpcClient, Notifier notifier) { + public InMemoryUSubscriptionClient (UTransport transport, RpcClient rpcClient, Notifier notifier) { Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR); Objects.requireNonNull(rpcClient, "RpcClient missing"); Objects.requireNonNull(notifier, "Notifier missing"); @@ -98,8 +127,8 @@ public InMemorySubscriber (UTransport transport, RpcClient rpcClient, Notifier n * subscribed to said topic. * * @param topic The topic to subscribe to. - * @param listener The listener to be called when a message is received on the topic. - * @param options The call options for the subscription. + * @param listener The listener to be called when messages are received. + * @param options The {@link CallOptions} to be used for the subscription. * @param handler {@link SubscriptionChangeHandler} to handle changes to subscription states. * @return Returns the CompletionStage with {@link SubscriptionResponse} or exception with the failure * reason as {@link UStatus}. {@link UCode.ALREADY_EXISTS} will be returned if you call this API multiple @@ -110,7 +139,7 @@ public CompletionStage subscribe(UUri topic, UListener lis SubscriptionChangeHandler handler) { Objects.requireNonNull(topic, "Subscribe topic missing"); Objects.requireNonNull(listener, "Request listener missing"); - options = Objects.requireNonNullElse(options, CallOptions.DEFAULT); + Objects.requireNonNull(options, "CallOptions missing"); final SubscriptionRequest request = SubscriptionRequest.newBuilder() .setTopic(topic) @@ -161,14 +190,15 @@ public CompletionStage subscribe(UUri topic, UListener lis * service, the listener and handler (if any) will remain registered. * * @param topic The topic to unsubscribe to. - * @param listener The listener to be called when a message is received on the topic. - * @param options The call options for the subscription. + * @param listener The listener to be called when messages are received. + * @param options The {@link CallOptions} to be used for the unsubscribe request. * @return Returns {@link UStatus} with the result from the unsubscribe request. */ @Override public CompletionStage unsubscribe(UUri topic, UListener listener, CallOptions options) { Objects.requireNonNull(topic, "Unsubscribe topic missing"); Objects.requireNonNull(listener, "listener missing"); + Objects.requireNonNull(options, "CallOptions missing"); final UnsubscribeRequest unsubscribeRequest = UnsubscribeRequest.newBuilder().setTopic(topic).build(); @@ -190,13 +220,13 @@ public CompletionStage unsubscribe(UUri topic, UListener listener, Call /** - * Unregisters a listener and removes any registered {@link SubscriptionChangeHandler} for the topic. + * Unregister the listener and removes any registered {@link SubscriptionChangeHandler} for the topic. * * This method is used to remove handlers/listeners without notifying the uSubscription service * so that we can be persistently subscribed even when the uE is not running. * * @param topic The topic to subscribe to. - * @param listener The listener to be called when a message is received on the topic. + * @param listener The listener to be called when messages are received. * @return Returns {@link UStatus} with the status of the listener unregister request. */ @Override @@ -207,6 +237,9 @@ public CompletionStage unregisterListener(UUri topic, UListener listene .whenComplete((status, exception) -> mHandlers.remove(topic)); } + /** + * Close the subscription client and clean up resources. + */ public void close() { mHandlers.clear(); notifier.unregisterNotificationListener(NOTIFICATION_TOPIC, mNotificationListener) @@ -214,6 +247,122 @@ public void close() { } + /** + * Register for Subscription Change Notifications. + * + * This API allows producers to register to receive subscription change notifications for + * topics that they produce only. + * + * @param topic The topic to register for notifications. + * @param handler The {@link SubscriptionChangeHandler} to handle the subscription changes. + * @param options The {@link CallOptions} to be used for the register request. + * @return {@link CompletionStage} completed successfully if uSubscription service accepts the + * request to register the caller to be notified of subscription changes, or + * the CompletionStage completes exceptionally with {@link UStatus} that indicates + * the failure reason. + */ + @Override + public CompletionStage registerForNotifications(UUri topic, + SubscriptionChangeHandler handler, CallOptions options) { + Objects.requireNonNull(topic, "Topic missing"); + Objects.requireNonNull(handler, "Handler missing"); + Objects.requireNonNull(options, "CallOptions missing"); + + NotificationsRequest request = NotificationsRequest.newBuilder() + .setTopic(topic) + .setSubscriber(SubscriberInfo.newBuilder().setUri(transport.getSource()).build()) + .build(); + + return RpcMapper.mapResponse(rpcClient.invokeMethod(REGISTER_NOTIFICATIONS_METHOD, + UPayload.pack(request), options), NotificationsResponse.class) + // Then Add the handler (if the client provided one) so the client can be notified of + // changes to the subscription state. + .whenComplete( (response, exception) -> { + if (exception == null) { + mHandlers.compute(topic, (k, existingHandler) -> { + if (existingHandler != null && existingHandler != handler) { + throw new UStatusException(UCode.ALREADY_EXISTS, "Handler already registered"); + } + return handler; + }); + } + }); + } + + + /** + * Unregister for subscription change notifications. + * + * @param topic The topic to unregister for notifications. + * @param handler The {@link SubscriptionChangeHandler} to handle the subscription changes. + * @param options The {@link CallOptions} to be used for the unregister request. + * @return {@link CompletionStage} completed successfully with {@link NotificationResponse} with + * the status of the API call to uSubscription service, or completed unsuccessfully with + * {@link UStatus} with the reason for the failure. {@link UCode.PERMISSION_DENIED} is + * returned if the topic ue_id does not equal the callers ue_id. + */ + @Override + public CompletionStage unregisterForNotifications(UUri topic, + SubscriptionChangeHandler handler, CallOptions options) { + Objects.requireNonNull(topic, "Topic missing"); + Objects.requireNonNull(handler, "Handler missing"); + Objects.requireNonNull(options, "CallOptions missing"); + + NotificationsRequest request = NotificationsRequest.newBuilder() + .setTopic(topic) + .setSubscriber(SubscriberInfo.newBuilder().setUri(transport.getSource()).build()) + .build(); + + return RpcMapper.mapResponse(rpcClient.invokeMethod(UNREGISTER_NOTIFICATIONS_METHOD, + UPayload.pack(request), options), NotificationsResponse.class) + .whenComplete((response, exception) -> mHandlers.remove(topic)); + } + + + /** + * Fetch the list of subscribers for a given produced topic. + * + * @param topic The topic to fetch the subscribers for. + * @param options The {@link CallOptions} to be used for the fetch request. + * @return {@link CompletionStage} completed successfully with {@link FetchSubscribersResponse} with + * the list of subscribers, or completed unsuccessfully with {@link UStatus} with the reason + * for the failure. + */ + @Override + public CompletionStage fetchSubscribers(UUri topic, CallOptions options) { + Objects.requireNonNull(topic, "Topic missing"); + Objects.requireNonNull(options, "CallOptions missing"); + + FetchSubscribersRequest request = FetchSubscribersRequest.newBuilder().setTopic(topic).build(); + return RpcMapper.mapResponse(rpcClient.invokeMethod(FETCH_SUBSCRIBERS_METHOD, + UPayload.pack(request), options), FetchSubscribersResponse.class); + } + + + /** + * Fetch list of Subscriptions for a given topic. + * + * API provides more information than {@code fetchSubscribers()} in that it also returns + * {@link SubscribeAttributes} per subscriber that might be useful to the producer to know. + * + * @param topic The topic to fetch subscriptions for. + * @param options The {@link CallOptions} to be used for the request. + * @return {@link CompletionStage} completed successfully with {@link FetchSubscriptionsResponse} that + * contains the subscription information per subscriber to the topic or completed unsuccessfully with + * {@link UStatus} with the reason for the failure. {@link UCode.PERMISSION_DENIED} is returned if the + * topic ue_id does not equal the callers ue_id. + */ + @Override + public CompletionStage fetchSubscriptions(FetchSubscriptionsRequest request, + CallOptions options) { + Objects.requireNonNull(request, "Request missing"); + Objects.requireNonNull(options, "CallOptions missing"); + + return RpcMapper.mapResponse(rpcClient.invokeMethod(FETCH_SUBSCRIPTIONS_METHOD, + UPayload.pack(request), options), FetchSubscriptionsResponse.class); + } + + /** * Handles incoming notifications from the USubscription service. * diff --git a/src/main/java/org/eclipse/uprotocol/communication/SubscriptionChangeHandler.java b/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/SubscriptionChangeHandler.java similarity index 95% rename from src/main/java/org/eclipse/uprotocol/communication/SubscriptionChangeHandler.java rename to src/main/java/org/eclipse/uprotocol/client/usubscription/v3/SubscriptionChangeHandler.java index cafd1e67..0bf53e4d 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/SubscriptionChangeHandler.java +++ b/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/SubscriptionChangeHandler.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.eclipse.uprotocol.communication; +package org.eclipse.uprotocol.client.usubscription.v3; import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionStatus; import org.eclipse.uprotocol.v1.UUri; diff --git a/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.java b/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.java new file mode 100644 index 00000000..d64c12ed --- /dev/null +++ b/src/main/java/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.java @@ -0,0 +1,263 @@ +/** + * SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.eclipse.uprotocol.client.usubscription.v3; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.uprotocol.communication.CallOptions; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscribersResponse; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsRequest; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsResponse; +import org.eclipse.uprotocol.core.usubscription.v3.NotificationsResponse; +import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionResponse; +import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionStatus; +import org.eclipse.uprotocol.transport.UListener; +import org.eclipse.uprotocol.v1.UCode; +import org.eclipse.uprotocol.v1.UStatus; +import org.eclipse.uprotocol.v1.UUri; + +/** + * The Client-side interface for communicating with the USubscription service. + */ +public interface USubscriptionClient { + + /** + * Subscribes to a given topic. + * + * The API will return a {@link CompletionStage} with the response {@link SubscriptionResponse} or exception + * with {@link UStatusException} containing the reason for the failure. + * + * @param topic The topic to subscribe to. + * @param listener The listener to be called when a message is received on the topic. + * @return Returns the CompletionStage with {@link SubscriptionResponse} or exception with the failure. + */ + default CompletionStage subscribe(UUri topic, UListener listener) { + return subscribe(topic, listener, CallOptions.DEFAULT); + } + + /** + * Subscribes to a given topic. + * + * The API will return a {@link CompletionStage} with the response {@link SubscriptionResponse} or exception + * with {@link UStatusException} containing the reason for the failure. + * + * @param topic The topic to subscribe to. + * @param listener The listener to be called when a message is received on the topic. + * @param options The {@link CallOptions} to be used for the subscription. + * @return Returns the CompletionStage with {@link SubscriptionResponse} or exception with the failure. + */ + default CompletionStage subscribe(UUri topic, UListener listener, CallOptions options) { + return subscribe(topic, listener, options, null); + } + + + /** + * Subscribes to a given topic. + * + * The API will return a {@link CompletionStage} with the response {@link SubscriptionResponse} or exception + * with the failure if the subscription was not successful. The optional passed {@link SubscriptionChangeHandler} + * is used to receive notifications of changes to the subscription status like a transition from + * {@link SubscriptionStatus.State.SUBSCRIBE_PENDING} to {@link SubscriptionStatus.State.SUBSCRIBED} that + * occurs when we subscribe to remote topics that the device we are on has not yet a subscriber that has + * subscribed to said topic. + * + * @param topic The topic to subscribe to. + * @param listener The listener to be called when a messages are received. + * @param options The {@link CallOptions} to be used for the subscription. + * @param handler {@link SubscriptionChangeHandler} to handle changes to subscription states. + * @return Returns the CompletionStage with {@link SubscriptionResponse} or exception with the failure + * reason as {@link UStatus}. {@link UCode.ALREADY_EXISTS} will be returned if you call this API multiple + * times passing a different handler. + */ + CompletionStage subscribe(UUri topic, UListener listener, CallOptions options, + SubscriptionChangeHandler handler); + + + /** + * Unsubscribes from a given topic. + * + * The subscriber no longer wishes to be subscribed to said topic so we issue a unsubscribe + * request to the USubscription service. The API will return a {@link CompletionStage} with the + * {@link UStatus} of the result. If we are unable to unsubscribe to the topic with USubscription + * service, the listener and handler (if any) will remain registered. + * + * @param topic The topic to unsubscribe to. + * @param listener The listener to be called when a message is received on the topic. + * @return Returns {@link UStatus} with the result from the unsubscribe request. + */ + default CompletionStage unsubscribe(UUri topic, UListener listener) { + return unsubscribe(topic, listener, CallOptions.DEFAULT); + } + + /** + * Unsubscribes from a given topic. + * + * The subscriber no longer wishes to be subscribed to said topic so we issue a unsubscribe + * request to the USubscription service. The API will return a {@link CompletionStage} with the + * {@link UStatus} of the result. If we are unable to unsubscribe to the topic with USubscription + * service, the listener and handler (if any) will remain registered. + * + * @param topic The topic to unsubscribe to. + * @param listener The listener to be called when a message is received on the topic. + * @param options The {@link CallOptions} to be used for the unsubscribe request. + * @return Returns {@link UStatus} with the result from the unsubscribe request. + */ + CompletionStage unsubscribe(UUri topic, UListener listener, CallOptions options); + + + /** + * Unregister a listener and removes any registered {@link SubscriptionChangeHandler} for the topic. + * + * This method is used to remove handlers/listeners without notifying the uSubscription service + * so that we can be persistently subscribed even when the uE is not running. + * + * @param topic The topic to subscribe to. + * @param listener The listener to be called when a message is received on the topic. + * @return Returns {@link UStatus} with the status of the listener unregister request. + */ + CompletionStage unregisterListener(UUri topic, UListener listener); + + + /** + * Register for Subscription Change Notifications. + * + * This API allows producers to register to receive subscription change notifications for + * topics that they produce only. + * + * NOTE: Subscribers are automatically registered to receive notifications when they call + * {@code subscribe()} API passing a {@link SubscriptionChangeHandler} so they do not need to + * call this API. + * + * @param topic The topic to register for notifications. + * @param handler The {@link SubscriptionChangeHandler} to handle the subscription changes. + * @return {@link CompletionStage} completed successfully if uSubscription service accepts the + * request to register the caller to be notified of subscription changes, or + * the CompletionStage completes exceptionally with {@link UStatus} that indicates + * the failure reason. + */ + default CompletionStage registerForNotifications(UUri topic, + SubscriptionChangeHandler handler) { + return registerForNotifications(topic, handler, CallOptions.DEFAULT); + } + + + /** + * Register for Subscription Change Notifications. + * + * This API allows producers to register to receive subscription change notifications for + * topics that they produce only. + * + * NOTE: Subscribers are automatically registered to receive notifications when they call + * {@code subscribe()} API passing a {@link SubscriptionChangeHandler} so they do not need to + * call this API. + * + * @param topic The topic to register for notifications. + * @param handler The {@link SubscriptionChangeHandler} to handle the subscription changes. + * @param options The {@link CallOptions} to be used for the request. + * @return {@link CompletionStage} completed successfully if uSubscription service accepts the + * request to register the caller to be notified of subscription changes, or + * the CompletionStage completes exceptionally with {@link UStatus} that indicates + * the failure reason. + */ + CompletionStage registerForNotifications(UUri topic, + SubscriptionChangeHandler handler, CallOptions options); + + + /** + * Unregister for subscription change notifications. + * + * @param topic The topic to unregister for notifications. + * @param handler The {@link SubscriptionChangeHandler} to be unregistered. + * @return {@link CompletionStage} completed successfully with {@link NotificationResponse} with + * the status of the API call to uSubscription service, or completed unsuccessfully with + * {@link UStatus} with the reason for the failure. + */ + default CompletionStage unregisterForNotifications(UUri topic, + SubscriptionChangeHandler handler) { + return unregisterForNotifications(topic, handler, CallOptions.DEFAULT); + } + + + /** + * Unregister for subscription change notifications. + * + * @param topic The topic to unregister for notifications. + * @param handler The {@link SubscriptionChangeHandler} to be unregistered. + * @param options The {@link CallOptions} to be used for the request. + * @return {@link CompletionStage} completed successfully with {@link NotificationResponse} with + * the status of the API call to uSubscription service, or completed unsuccessfully with + * {@link UStatus} with the reason for the failure. + */ + CompletionStage unregisterForNotifications(UUri topic, SubscriptionChangeHandler handler, + CallOptions options); + + + /** + * Fetch the list of subscribers for a given produced topic. + * + * @param topic The topic to fetch the subscribers for. + * @return {@link CompletionStage} completed successfully with {@link FetchSubscribersResponse} with + * the list of subscribers, or completed unsuccessfully with {@link UStatus} with the reason + * for the failure. + */ + default CompletionStage fetchSubscribers(UUri topic) { + return fetchSubscribers(topic, CallOptions.DEFAULT); + } + + + /** + * Fetch the list of subscribers for a given produced topic. + * + * @param topic The topic to fetch the subscribers for. + * @param options The {@link CallOptions} to be used for the request. + * @return {@link CompletionStage} completed successfully with {@link FetchSubscribersResponse} with + * the list of subscribers, or completed unsuccessfully with {@link UStatus} with the reason + * for the failure. + */ + CompletionStage fetchSubscribers(UUri topic, CallOptions options); + + + /** + * Fetch list of Subscriptions for a given topic. + * + * API provides more information than {@code fetchSubscribers()} in that it also returns + * {@link SubscribeAttributes} per subscriber that might be useful to the producer to know. + * + * @param topic The topic to fetch subscriptions for. + * @return {@link CompletionStage} completed successfully with {@link FetchSubscriptionsResponse} that + * contains the subscription information per subscriber to the topic or completed unsuccessfully with + * {@link UStatus} with the reason for the failure. {@link UCode.PERMISSION_DENIED} is returned if the + * topic ue_id does not equal the callers ue_id. + */ + default CompletionStage fetchSubscriptions(FetchSubscriptionsRequest request) { + return fetchSubscriptions(request, CallOptions.DEFAULT); + } + + + /** + * Fetch list of Subscriptions for a given topic. + * + * API provides more information than {@code fetchSubscribers()} in that it also returns + * {@link SubscribeAttributes} per subscriber that might be useful to the producer to know. + * + * @param topic The topic to fetch subscriptions for. + * @param options The {@link CallOptions} to be used for the request. + * @return {@link CompletionStage} completed successfully with {@link FetchSubscriptionsResponse} that + * contains the subscription information per subscriber to the topic or completed unsuccessfully with + * {@link UStatus} with the reason for the failure. {@link UCode.PERMISSION_DENIED} is returned if the + * topic ue_id does not equal the callers ue_id. + */ + CompletionStage fetchSubscriptions(FetchSubscriptionsRequest request, + CallOptions options); + +} diff --git a/src/main/java/org/eclipse/uprotocol/communication/Subscriber.java b/src/main/java/org/eclipse/uprotocol/communication/Subscriber.java deleted file mode 100644 index 82d61041..00000000 --- a/src/main/java/org/eclipse/uprotocol/communication/Subscriber.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.eclipse.uprotocol.communication; - -import java.util.concurrent.CompletionStage; -import org.eclipse.uprotocol.transport.UListener; -import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionResponse; -import org.eclipse.uprotocol.v1.UStatus; -import org.eclipse.uprotocol.v1.UUri; - - -/** - * Communication Layer (uP-L2) Subscriber interface.
- * - * This interface provides APIs to subscribe and unsubscribe to a given topic. - */ -public interface Subscriber { - - /** - * Subscribe to a given topic.
- * - * The API will return a {@link CompletionStage} with the response {@link SubscriptionResponse} or exception - * with the failure if the subscription was not successful. The API will also register the listener to be - * called when messages are received. - * - * @param topic The topic to subscribe to. - * @param listener The {@link UListener} that is called when published messages are received. - * @return Returns the CompletionStage with {@link SubscriptionResponse} or exception with the failure - * reason as {@link UStatus}. - */ - default CompletionStage subscribe(UUri topic, UListener listener) { - return subscribe(topic, listener, null); - } - - - /** - * Subscribe to a given topic.
- * - * The API will return a {@link CompletionStage} with the response {@link SubscriptionResponse} or exception - * with the failure if the subscription was not successful. The API will also register the listener to be - * called when messages are received. - * - * @param topic The topic to subscribe to. - * @param listener The {@link UListener} that is called when published messages are received. - * @param options The {@link CallOptions} to provide additional information (timeout, token, etc...). - * @return Returns the CompletionStage with the response UMessage or exception with the failure - * reason as {@link UStatus}. - */ - default CompletionStage subscribe(UUri topic, UListener listener, CallOptions options) { - return subscribe(topic, listener, options, null); - } - - - /** - * Subscribe to a given topic.
- * - * The API will return a {@link CompletionStage} with the response {@link SubscriptionResponse} or exception - * with the failure if the subscription was not successful. The API will also register the listener to be - * called when messages are received and allow the caller to register a {@link SubscriptionChangeHandler} - * that is called whenever the subscription state changes (ex. PENDING to SUBSCRIBED, - * SUBSCRIBED to UNSUBSCRIBED, etc...). - * - * @param topic The topic to subscribe to. - * @param listener The {@link UListener} that is called when published messages are received. - * @param options The {@link CallOptions} to provide additional information (timeout, token, etc...). - * @param handler {@link SubscriptionChangeHandler} to handle changes to subscription states - * @return Returns the CompletionStage with the response UMessage or exception with the failure - * reason as {@link UStatus}. - */ - CompletionStage subscribe(UUri topic, UListener listener, CallOptions options, - SubscriptionChangeHandler handler); - - - /** - * Unsubscribe to a given topic.
- * - * The subscriber no longer wishes to be subscribed to said topic so we issue a unsubscribe - * request to the USubscription service. - * - * @param topic The topic to unsubscribe to. - * @param listener The listener to be called when a message is received on the topic. - * @return Returns {@link UStatus} with the result from the unsubscribe request. - */ - default CompletionStage unsubscribe(UUri topic, UListener listener) { - return unsubscribe(topic, listener, CallOptions.DEFAULT); - } - - /** - * Unsubscribe to a given topic.
- * - * The subscriber no longer wishes to be subscribed to said topic so we issue a unsubscribe - * request to the USubscription service. - * - * @param topic The topic to unsubscribe to. - * @param listener The listener to be called when a message is received on the topic. - * @param options The call options for the subscription. - * @return Returns {@link UStatus} with the result from the unsubscribe request. - */ - CompletionStage unsubscribe(UUri topic, UListener listener, CallOptions options); - - - /** - * Unregister a listener from a topic.
- * - * This method will only unregister the listener for a given subscription thus allowing a uE to stay - * subscribed even if the listener is removed. - * - * @param topic The topic to subscribe to. - * @param listener The listener to be called when a message is received on the topic. - * @return Returns {@link UStatus} with the status of the listener unregister request. - */ - CompletionStage unregisterListener(UUri topic, UListener listener); -} \ No newline at end of file diff --git a/src/main/java/org/eclipse/uprotocol/communication/UClient.java b/src/main/java/org/eclipse/uprotocol/communication/UClient.java index e270b36d..82257dbf 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/UClient.java +++ b/src/main/java/org/eclipse/uprotocol/communication/UClient.java @@ -14,7 +14,7 @@ import java.util.Objects; import java.util.concurrent.CompletionStage; -import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionResponse; + import org.eclipse.uprotocol.transport.UListener; import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.v1.UStatus; @@ -23,7 +23,7 @@ /** * Default implementation of the communication layer that uses the {@link UTransport}. */ -public class UClient implements RpcServer, Subscriber, Notifier, Publisher, RpcClient { +public class UClient implements RpcServer, Notifier, Publisher, RpcClient { // The transport to use for sending the RPC requests private final UTransport transport; @@ -32,7 +32,6 @@ public class UClient implements RpcServer, Subscriber, Notifier, Publisher, RpcC private final SimplePublisher publisher; private final SimpleNotifier notifier; private final InMemoryRpcClient rpcClient; - private final InMemorySubscriber subscriber; private UClient (UTransport transport) { this.transport = transport; @@ -41,27 +40,9 @@ private UClient (UTransport transport) { publisher = new SimplePublisher(transport); notifier = new SimpleNotifier(transport); rpcClient = new InMemoryRpcClient(transport); - subscriber = new InMemorySubscriber(transport, rpcClient, notifier); - } - - - @Override - public CompletionStage subscribe(UUri topic, UListener listener, - CallOptions options, SubscriptionChangeHandler handler) { - return subscriber.subscribe(topic, listener, options, handler); } - @Override - public CompletionStage unsubscribe(UUri topic, UListener listener, CallOptions options) { - return subscriber.unsubscribe(topic, listener, options); - } - - @Override - public CompletionStage unregisterListener(UUri topic, UListener listener) { - return subscriber.unregisterListener(topic, listener); - } - @Override public CompletionStage notify(UUri topic, UUri destination, CallOptions options, UPayload payload) { return notifier.notify(topic, destination, options, payload); @@ -117,6 +98,5 @@ public static UClient create(UTransport transport) { public void close() { rpcClient.close(); - subscriber.close(); } } diff --git a/src/main/java/org/eclipse/uprotocol/transport/UTransport.java b/src/main/java/org/eclipse/uprotocol/transport/UTransport.java index eae086f2..9b760036 100644 --- a/src/main/java/org/eclipse/uprotocol/transport/UTransport.java +++ b/src/main/java/org/eclipse/uprotocol/transport/UTransport.java @@ -15,6 +15,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletableFuture; +import org.eclipse.uprotocol.uri.factory.UriFactory; import org.eclipse.uprotocol.v1.UCode; import org.eclipse.uprotocol.v1.UMessage; import org.eclipse.uprotocol.v1.UStatus; @@ -57,7 +58,7 @@ public interface UTransport { * correctly, otherwise it returns with the appropriate failure. */ default CompletionStage registerListener(UUri sourceFilter, UListener listener) { - return registerListener(sourceFilter, null, listener); + return registerListener(sourceFilter, UriFactory.ANY, listener); } @@ -93,7 +94,7 @@ default CompletionStage registerListener(UUri sourceFilter, UListener l * correctly, otherwise it returns with the appropriate failure. */ default CompletionStage unregisterListener(UUri sourceFilter, UListener listener) { - return unregisterListener(sourceFilter, null, listener); + return unregisterListener(sourceFilter, UriFactory.ANY, listener); } diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java b/src/test/java/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.java similarity index 58% rename from src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java rename to src/test/java/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.java index 45739e52..14b30334 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java +++ b/src/test/java/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.eclipse.uprotocol.communication; +package org.eclipse.uprotocol.client.usubscription.v3; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CompletableFuture; @@ -18,6 +18,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -38,7 +39,15 @@ import static org.mockito.Mockito.mock; import org.mockito.junit.jupiter.MockitoExtension; - +import org.eclipse.uprotocol.communication.CallOptions; +import org.eclipse.uprotocol.communication.InMemoryRpcClient; +import org.eclipse.uprotocol.communication.SimpleNotifier; +import org.eclipse.uprotocol.communication.UPayload; +import org.eclipse.uprotocol.communication.UStatusException; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscribersResponse; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsRequest; +import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsResponse; +import org.eclipse.uprotocol.core.usubscription.v3.NotificationsResponse; import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionResponse; import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionStatus; import org.eclipse.uprotocol.core.usubscription.v3.UnsubscribeResponse; @@ -52,7 +61,7 @@ import org.eclipse.uprotocol.v1.UUri; @ExtendWith(MockitoExtension.class) -public class InMemorySubscriberTest { +public class InMemoryUSubscriptionClientTest { @Mock private UTransport transport; @@ -84,6 +93,32 @@ public void setup() { } + @Test + @DisplayName("Testing creation of InMemoryUSubscriptionClient passing only the transport") + public void test_creation_of_InMemoryUSubscriptionClient_passing_only_the_transport() { + when(transport.getSource()).thenReturn(source); + + when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + assertDoesNotThrow(() -> { + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport); + }); + + verify(transport, times(2)).getSource(); + verify(transport, times(2)).registerListener(any(), any(), any()); + } + + + @Test + @DisplayName("Testing creation of InMemoryUSubscriptionClient passing null for the transport") + public void test_creation_of_InMemoryUSubscriptionClient_passing_null_for_the_transport() { + assertThrows(NullPointerException.class, () -> { + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(null); + }); + } + + @Test @DisplayName("Testing simple mock of RpcClient and notifier happy path") public void test_simple_mock_of_rpcClient_and_notifier() { @@ -105,7 +140,7 @@ public void test_simple_mock_of_rpcClient_and_notifier() { .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -140,7 +175,7 @@ public void test_simple_mock_of_rpcClient_and_notifier_returned_subscribe_pendin when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -173,7 +208,7 @@ public void test_simple_mock_when_subscription_service_returns_unsubscribed() { .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -200,7 +235,7 @@ void test_subscribe_using_mock_RpcClient_and_SimplerNotifier_when_invokemethod_r when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertThrows(CompletionException.class, () -> { @@ -240,7 +275,7 @@ void test_subscribe_when_we_pass_a_subscription_change_notification_handler() { when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { @@ -279,7 +314,7 @@ void test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice() { when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { @@ -323,7 +358,7 @@ void test_subscribe_to_the_same_topic_twice_passing_different_subscription_chang when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler1 = new SubscriptionChangeHandler() { @@ -346,8 +381,7 @@ public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { }); assertThrows( CompletionException.class, () -> { - CompletionStage response = subscriber.subscribe(topic, listener, - CallOptions.DEFAULT, handler2); + CompletionStage response = subscriber.subscribe(topic, listener, CallOptions.DEFAULT, handler2); assertTrue(response.toCompletableFuture().isCompletedExceptionally()); response.handle((r, e) -> { @@ -381,7 +415,7 @@ void test_unsubscribe_using_mock_RpcClient_and_SimplerNotifier() { when(notifier.unregisterNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -409,7 +443,7 @@ void test_unsubscribe_when_invokemethod_return_an_exception() { when(notifier.unregisterNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -446,7 +480,7 @@ void test_unsubscribe_when_invokemethod_returned_OK_but_we_failed_to_unregister_ when(notifier.unregisterNotificationListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -507,7 +541,7 @@ public void run() { }); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { @@ -575,7 +609,7 @@ public void run() { }); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { @@ -644,7 +678,7 @@ public void run() { }); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { @@ -711,7 +745,7 @@ public void run() { }); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { @@ -780,7 +814,7 @@ public void run() { }); - InMemorySubscriber subscriber = new InMemorySubscriber(transport, rpcClient, notifier); + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); assertNotNull(subscriber); assertDoesNotThrow(() -> { @@ -799,4 +833,474 @@ public void run() { verify(transport, times(1)).registerListener(any(), any()); verify(transport, times(1)).getSource(); } + + + @Test + @DisplayName("Test registerNotification() api when passed a null topic") + void test_registerNotification_api_when_passed_a_null_topic() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + assertThrows(NullPointerException.class, () -> { + subscriber.registerForNotifications(null, handler); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test registerNotification() api when passed a null handler") + void test_registerNotification_api_when_passed_a_null_handler() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + assertThrows(NullPointerException.class, () -> { + subscriber.registerForNotifications(topic, null); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + @Test + @DisplayName("Test unregisterListener() api for the happy path") + void test_unregisterListener_api_for_the_happy_path() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(transport.registerListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.unregisterListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture(UPayload.pack(SubscriptionResponse.newBuilder() + .setTopic(topic) + .setStatus(SubscriptionStatus.newBuilder().setState(SubscriptionStatus.State.SUBSCRIBED).build()) + .build()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + UListener listener = new UListener() { + @Override + public void onReceive(UMessage message) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'onReceive'"); + } + }; + + assertDoesNotThrow(() -> { + assertEquals(subscriber.subscribe(topic, listener).toCompletableFuture().get().getStatus().getState(), + SubscriptionStatus.State.SUBSCRIBED); + assertEquals(subscriber.unregisterListener(topic, listener).toCompletableFuture().get().getCode(), UCode.OK); + }); + + verify(transport, times(1)).unregisterListener(any(), any()); + } + + + @Test + @DisplayName("Test registerNotification() api when passed a valid topic and handler") + void test_registerNotification_api_when_passed_a_valid_topic_and_handler() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture(UPayload.pack(NotificationsResponse.getDefaultInstance()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + UUri topic = UUri.newBuilder(transport.getSource()).setResourceId(0x8000).build(); + + assertDoesNotThrow(() -> subscriber.registerForNotifications(topic, handler) .toCompletableFuture().get()); + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test registerNotification() api when invokeMethod() throws an exception") + void test_registerNotification_api_when_invokeMethod_throws_an_exception() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.failedFuture( + new UStatusException(UCode.PERMISSION_DENIED, "Not permitted"))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + UUri topic = UUri.newBuilder(transport.getSource()).setResourceId(0x8000).build(); + + assertDoesNotThrow(() -> { + CompletionStage response = subscriber.registerForNotifications(topic, handler); + assertTrue(response.toCompletableFuture().isCompletedExceptionally()); + response.handle((r, e) -> { + e = e.getCause(); + assertTrue(e instanceof UStatusException); + assertEquals(((UStatusException) e).getCode(), UCode.PERMISSION_DENIED); + return null; + }).toCompletableFuture().get(); + }); + } + + + @Test + @DisplayName("Test registerNotification() calling the API twice passing the same topic and handler") + void test_registerNotification_api_calling_the_api_twice_passing_the_same_topic_and_handler() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture(UPayload.pack(NotificationsResponse.getDefaultInstance()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + UUri topic = UUri.newBuilder(transport.getSource()).setResourceId(0x8000).build(); + + assertDoesNotThrow(() -> { + assertTrue(NotificationsResponse.getDefaultInstance().equals( + subscriber.registerForNotifications(topic, handler).toCompletableFuture().get())); + }); + + assertDoesNotThrow(() -> { + assertTrue(NotificationsResponse.getDefaultInstance().equals( + subscriber.registerForNotifications(topic, handler).toCompletableFuture().get())); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test registerNotification() calling the API twice passing the same topic but different handlers") + void test_registerNotification_api_calling_the_api_twice_passing_the_same_topic_but_different_handlers() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture(UPayload.pack(NotificationsResponse.getDefaultInstance()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler1 = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + SubscriptionChangeHandler handler2 = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + UUri topic = UUri.newBuilder(transport.getSource()).setResourceId(0x8000).build(); + + assertDoesNotThrow(() -> { + assertTrue(NotificationsResponse.getDefaultInstance().equals( + subscriber.registerForNotifications(topic, handler1).toCompletableFuture().get())); + }); + + assertDoesNotThrow(() -> { + CompletionStage response = subscriber.registerForNotifications(topic, handler2); + assertTrue(response.toCompletableFuture().isCompletedExceptionally()); + + response.handle((r, e) -> { + assertNotNull(e); + e = e.getCause(); + assertTrue(e instanceof UStatusException); + assertEquals(((UStatusException) e).getCode(), UCode.ALREADY_EXISTS); + return null; + }).toCompletableFuture().join(); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test unregisterNotification() api for the happy path") + void test_unregisterNotification_api_for_the_happy_path() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture(UPayload.pack(NotificationsResponse.getDefaultInstance()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + UUri topic = UUri.newBuilder(transport.getSource()).setResourceId(0x8000).build(); + + assertDoesNotThrow(() -> { + subscriber.registerForNotifications(topic, handler).toCompletableFuture().get(); + subscriber.unregisterForNotifications(topic, handler).toCompletableFuture().get(); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test unregisterNotification() api when passed a null topic") + void test_unregisterNotification_api_when_passed_a_null_topic() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + + assertThrows(NullPointerException.class, () -> { + subscriber.unregisterForNotifications(null, handler); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test unregisterNotification() api when passed a null handler") + void test_unregisterNotification_api_when_passed_a_null_handler() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + assertThrows(NullPointerException.class, () -> { + subscriber.unregisterForNotifications(topic, null); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test calling unregisterNotification() api when we never registered the notification below") + void test_calling_unregisterNotification_api_when_we_never_registered_the_notification_below() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(transport.getSource()).thenReturn(source); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.failedFuture(new UStatusException(UCode.NOT_FOUND, "Not found"))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + SubscriptionChangeHandler handler = new SubscriptionChangeHandler() { + @Override + public void handleSubscriptionChange(UUri topic, SubscriptionStatus status) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'handleSubscriptionChange'"); + } + }; + UUri topic = UUri.newBuilder(transport.getSource()).setResourceId(0x8000).build(); + + assertDoesNotThrow(() -> { + CompletionStage response = subscriber.unregisterForNotifications(topic, handler); + assertTrue(response.toCompletableFuture().isCompletedExceptionally()); + + response.handle((r, e) -> { + assertNotNull(e); + e = e.getCause(); + assertTrue(e instanceof UStatusException); + assertEquals(((UStatusException) e).getCode(), UCode.NOT_FOUND); + return null; + }).toCompletableFuture().join(); + }); + + verify(transport, times(2)).getSource(); + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test fetchSubscribers() when pssing null topic") + void test_fetchSubscribers_when_passing_null_topic() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + assertThrows(NullPointerException.class, () -> { + subscriber.fetchSubscribers(null).toCompletableFuture().get(); + }); + } + + + @Test + @DisplayName("Test fetchSubscribers() when passing a valid topic") + void test_fetchSubscribers_when_passing_a_valid_topic() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture( + UPayload.pack(FetchSubscribersResponse.getDefaultInstance()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + assertDoesNotThrow(() -> { + assertTrue(FetchSubscribersResponse.getDefaultInstance().equals( + subscriber.fetchSubscribers(topic).toCompletableFuture().get())); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test fetchSubscribers() when passing when invokeMethod returns NOT_PERMITTED") + void test_fetchSubscribers_when_passing_when_invokeMethod_returns_NOT_PERMITTED() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.failedFuture( + new UStatusException(UCode.PERMISSION_DENIED, "Not permitted"))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + assertDoesNotThrow(() -> { + CompletionStage response = subscriber.fetchSubscribers(topic); + assertTrue(response.toCompletableFuture().isCompletedExceptionally()); + response.handle((r, e) -> { + e = e.getCause(); + assertTrue(e instanceof UStatusException); + assertEquals(((UStatusException) e).getCode(), UCode.PERMISSION_DENIED); + return null; + }).toCompletableFuture().join(); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test fetchSubscriptions() passing null FetchSubscriptionRequest") + void test_fetchSubscriptions_passing_null_FetchSubscriptionRequest() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + assertThrows(NullPointerException.class, () -> { + subscriber.fetchSubscriptions(null).toCompletableFuture().get(); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } + + + @Test + @DisplayName("Test fetchSubscriptions() passing a valid FetchSubscriptionRequest") + void test_fetchSubscriptions_passing_a_valid_FetchSubscriptionRequest() { + when(notifier.registerNotificationListener(any(UUri.class), any(UListener.class))) + .thenReturn(CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build())); + + when(rpcClient.invokeMethod(any(UUri.class), any(UPayload.class), any(CallOptions.class))) + .thenReturn(CompletableFuture.completedFuture( + UPayload.pack(FetchSubscriptionsResponse.getDefaultInstance()))); + + InMemoryUSubscriptionClient subscriber = new InMemoryUSubscriptionClient(transport, rpcClient, notifier); + assertNotNull(subscriber); + + FetchSubscriptionsRequest request = FetchSubscriptionsRequest.newBuilder().setTopic(topic).build(); + + assertDoesNotThrow(() -> { + assertTrue(FetchSubscriptionsResponse.getDefaultInstance().equals( + subscriber.fetchSubscriptions(request).toCompletableFuture().get())); + }); + + verify(notifier, times(1)).registerNotificationListener(any(), any()); + } } diff --git a/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java b/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java index 85c50054..93af3790 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java @@ -13,12 +13,9 @@ package org.eclipse.uprotocol.communication; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import org.eclipse.uprotocol.transport.UListener; -import org.eclipse.uprotocol.v1.UCode; import org.eclipse.uprotocol.v1.UMessage; -import org.eclipse.uprotocol.v1.UStatus; import org.eclipse.uprotocol.v1.UUri; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -46,18 +43,6 @@ public void onReceive(UMessage message) { assertDoesNotThrow(() -> client.invokeMethod(createMethodUri(), null, null).toCompletableFuture().get()); - assertDoesNotThrow(() -> - client.subscribe(createTopic(), listener).toCompletableFuture().get()); - - assertDoesNotThrow(() -> - client.unsubscribe(createTopic(), listener).toCompletableFuture().get()); - - // The listener is not registered anymore so it should fail - assertDoesNotThrow(() -> { - UStatus result = client.unregisterListener(createTopic(), listener).toCompletableFuture().get(); - assertEquals(result.getCode(), UCode.NOT_FOUND); - }); - assertDoesNotThrow(() -> client.registerNotificationListener(createTopic(), listener).toCompletableFuture().get());