Skip to content

Commit

Permalink
Add CallOptions, handler for Subscriber and unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
dddj698 committed Jul 10, 2024
1 parent 22335ca commit 97950d0
Show file tree
Hide file tree
Showing 26 changed files with 1,175 additions and 189 deletions.
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<slf4j.version>2.6.18</slf4j.version>
<protobuf.version>3.21.10</protobuf.version>
<git.tag.name>main</git.tag.name>

<up-spec.git.tag.name>v1.6.0-alpha.2</up-spec.git.tag.name>
</properties>

<scm>
Expand Down Expand Up @@ -216,7 +215,7 @@
<arguments>
<argument>clone</argument>
<argument>--branch</argument>
<argument>${git.tag.name}</argument>
<argument>${up-spec.git.tag.name}</argument>
<argument>https://github.com/eclipse-uprotocol/up-spec.git</argument>
<argument>${project.build.directory}/up-spec</argument>
</arguments>
Expand All @@ -239,7 +238,7 @@
<arguments>
<argument>pull</argument>
<argument>origin</argument>
<argument>${git.tag.name}</argument>
<argument>${up-spec.git.tag.name}</argument>
</arguments>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class InMemoryRpcClient(
}
}

/**
* Close the RPC client and clean up any resources
*/
fun close() {
mRequests.clear()
scope.launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
*/
package org.eclipse.uprotocol.communication

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.eclipse.uprotocol.core.usubscription.v3.*
import org.eclipse.uprotocol.transport.UListener
import org.eclipse.uprotocol.transport.UTransport
import org.eclipse.uprotocol.uri.factory.UUriFactory
import org.eclipse.uprotocol.v1.UCode
import org.eclipse.uprotocol.v1.UStatus
import org.eclipse.uprotocol.v1.UUri
import org.eclipse.uprotocol.v1.uStatus
import java.util.concurrent.CompletionException
import java.util.concurrent.CompletionStage
import org.eclipse.uprotocol.v1.*
import java.util.*
import java.util.logging.Logger

/**
* The following is an example implementation of the [Subscriber] interface that
Expand All @@ -36,73 +36,115 @@ import java.util.concurrent.CompletionStage
* @param transport the transport to use for sending the notifications
* @param rpcClient the rpc client to use for sending the RPC requests
*/
class InMemorySubscriber(private val transport: UTransport, private val rpcClient: RpcClient) : Subscriber {
class InMemorySubscriber(
private val transport: UTransport,
private val rpcClient: RpcClient,
private val notifier: Notifier,
dispatcher: CoroutineDispatcher = Dispatchers.IO
) : Subscriber {
// Map to store subscription change notification handlers
private val mHandlers = HashMap<UUri, SubscriptionChangeHandler>()

private val scope = CoroutineScope(SupervisorJob() + dispatcher)

private val mutex = Mutex()

// transport Notification listener that will process subscription change notifications
private val mNotificationListener = UListener { message ->
handleNotifications(message)
}

init {
scope.launch {
notifier.registerNotificationListener(NOTIFICATION_TOPIC, mNotificationListener)
}
}

/**
* Subscribe to a given topic.
*
* The API will return a [CompletionStage] with the response [SubscriptionResponse] or exception
* The API will return a [Result] with the response [SubscriptionResponse] or exception
* with the failure if the subscription was not successful. The API will also register the listener to be
* called when messages are received.
*
* @param topic The topic to subscribe to.
* @param listener The listener to be called when a message is received on the topic.
* @param options The call options for the subscription.
* @return Returns the CompletionStage with the response UMessage or exception with the failure
* @return Returns the [Result] with the response UMessage or exception with the failure
* reason as [UStatus].
*/
override suspend fun subscribe(
topic: UUri,
listener: UListener,
options: CallOptions
options: CallOptions,
handler: SubscriptionChangeHandler?
): Result<SubscriptionResponse> {
val subscribe: UUri = UUriFactory.fromProto(
USubscriptionProto.getDescriptor().services[0], METHOD_SUBSCRIBE
)

val request = subscriptionRequest {
this.topic = topic
this.subscriber = subscriberInfo { uri = transport.getSource() }
}

return rpcClient.invokeMethod(subscribe, UPayload.pack(request), options).mapToMessage<SubscriptionResponse>()
.also {
transport.registerListener(topic, listener = listener)
return rpcClient.invokeMethod(SUBSCRIBE_METHOD, UPayload.pack(request), options)
.mapToMessage<SubscriptionResponse>()
.onSuccess {
val state = it.status.state
if (state == SubscriptionStatus.State.SUBSCRIBED || state == SubscriptionStatus.State.SUBSCRIBE_PENDING) {
transport.registerListener(topic, listener = listener)
}
}.mapCatching { response ->
handler?.let { handler ->
mutex.withLock {
if (mHandlers[topic] != null && mHandlers[topic] != handler) {
throw UStatusException(UCode.ALREADY_EXISTS, "Handler already registered")
} else if (mHandlers[topic] == handler) {
response
} else {
mHandlers[topic] = handler
response
}
}
} ?: response
}
}


/**
* Unsubscribe to a given topic.
*
* The subscriber no longer wishes to be subscribed to said topic, so we issue an unsubscribe
* request to the USubscription service.
* The subscriber no longer wishes to be subscribed to said topic so we issue a unsubscribe
* request to the USubscription service. The API will return a [UStatus]. If we are unable to
* unsubscribe to the topic with USubscription service, the listener and handler (if any) will remain registered.
*
* @param topic The topic to unsubscribe to.
* @param listener The listener to be called when a message is received on the topic.
* @param options The call options for the subscription.
* @return Returns [UStatus] with the result from the unsubscribe request.
*/
override suspend fun unsubscribe(topic: UUri, listener: UListener, options: CallOptions): UStatus {
val unsubscribe: UUri = UUriFactory.fromProto(
USubscriptionProto.getDescriptor().services[0], METHOD_UNSUBSCRIBE
)
override suspend fun unsubscribe(
topic: UUri,
listener: UListener,
options: CallOptions
): UStatus {
val unsubscribeRequest = unsubscribeRequest { this.topic = topic }

return rpcClient.invokeMethod(
unsubscribe, UPayload.pack(unsubscribeRequest), options
UNSUBSCRIBE_METHOD, UPayload.pack(unsubscribeRequest), options
).mapToMessage<UnsubscribeResponse>().fold({
mutex.withLock {
mHandlers.remove(topic)
}
transport.unregisterListener(topic, listener = listener)
}, { e ->
val exception = if (e is CompletionException) (e.cause ?: e) else e
when (exception) {
when (e) {
is UStatusException -> {
exception.status
e.status
}

else -> {
uStatus {
code = UCode.INVALID_ARGUMENT
message = exception.message ?: "Invalid argument"
message = e.message ?: "Invalid argument"
}
}
}
Expand All @@ -111,21 +153,58 @@ class InMemorySubscriber(private val transport: UTransport, private val rpcClien


/**
* Unregister a listener from a topic.
* Unregister a listener and remove any registered [SubscriptionChangeHandler] for the topic.
*
* This method will only unregister the listener for a given subscription thus allowing a uE to stay
* subscribed even if the listener is removed.
* This method is used to remove handlers/listeners without notifying the uSubscription service
* so that we can be persistently subscribed even when the uE is not running.
*
* @param topic The topic to subscribe to.
* @param listener The listener to be called when a message is received on the topic.
* @return Returns [UStatus] with the status of the listener unregister request.
*/
override suspend fun unregisterListener(topic: UUri, listener: UListener): UStatus {
return transport.unregisterListener(topic, listener = listener)
return transport.unregisterListener(topic, listener = listener).also {
mHandlers.remove(topic)
}
}

fun close() {
mHandlers.clear()
scope.launch {
notifier.unregisterNotificationListener(NOTIFICATION_TOPIC, mNotificationListener)
}

}


/**
* Handle incoming notifications from the USubscription service.
*
* @param message The notification message from the USubscription service
*/
private fun handleNotifications(message: UMessage) {
// Ignore messages that are not notifications
message.takeIf { it.attributes.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION }?.let { msg ->
// Unpack the notification message from uSubscription called Update
UPayload.unpack<Update>(msg.payload, msg.attributes.payloadFormat)?.let {
// Check if we have a handler registered for the subscription change notification for the specific
// topic that triggered the subscription change notification. It is very possible that the client
// did not register one to begin with (ex/ they don't care to receive it)
try{
mHandlers[it.topic]?.handleSubscriptionChange(it.topic, it.status)
}catch (e:Exception){
Logger.getGlobal().info(e.message)
}
}
}
}

companion object {
private const val METHOD_SUBSCRIBE = 1 // TODO: Fetch this from proto generated code
private const val METHOD_UNSUBSCRIBE = 2 // TODO: Fetch this from proto generated code
private val USUBSCRIPTION = USubscriptionProto.getDescriptor().services[0]

// TODO: The following items need to be pulled from generated code
private val SUBSCRIBE_METHOD: UUri = UUriFactory.fromProto(USUBSCRIPTION, 1)
private val UNSUBSCRIBE_METHOD: UUri = UUriFactory.fromProto(USUBSCRIPTION, 2)
private val NOTIFICATION_TOPIC: UUri = UUriFactory.fromProto(USUBSCRIPTION, 0x8000)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ interface Notifier {
*
* @param topic The topic to send the notification to.
* @param destination The destination to send the notification to.
* @param options [CallOptions] for the notification.
* @param payload The payload to send with the notification.
* @return Returns the [UStatus] with the status of the notification.
*/
suspend fun notify(topic: UUri, destination: UUri, payload: UPayload?): UStatus
suspend fun notify(
topic: UUri,
destination: UUri,
options: CallOptions = CallOptions(),
payload: UPayload? = null
): UStatus


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ interface Publisher {
* Publish a message to a topic passing [UPayload] as the payload.
*
* @param topic The topic to publish to.
* @param options The [CallOptions] for the publish.
* @param payload The [UPayload] to publish.
* @return
*/
suspend fun publish(topic: UUri, payload: UPayload?): UStatus
suspend fun publish(topic: UUri, options: CallOptions = CallOptions(), payload: UPayload? = null): UStatus
}
Loading

0 comments on commit 97950d0

Please sign in to comment.