Skip to content

Commit

Permalink
Init Commit for 1.5.8-L2
Browse files Browse the repository at this point in the history
  • Loading branch information
dddj698 committed Jun 27, 2024
1 parent 2683c57 commit 22335ca
Show file tree
Hide file tree
Showing 41 changed files with 2,944 additions and 213 deletions.
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
</executions>
<configuration>
<includes>
<include>**/communication/**</include>
<include>**/transport/**</include>
<include>**/uri/**</include>
<include>**/uuid/**</include>
Expand Down Expand Up @@ -195,7 +196,7 @@
</execution>
</executions>
<configuration>
<jvmTarget>${maven.compiler.target}</jvmTarget>
<jvmTarget>17</jvmTarget>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -360,6 +361,13 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.mockk</groupId>
<artifactId>mockk-jvm</artifactId>
<version>1.13.11</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
Expand Down
34 changes: 34 additions & 0 deletions src/main/kotlin/org/eclipse/uprotocol/communication/CallOptions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication

import org.eclipse.uprotocol.v1.UPriority

/**
* This class is used to pass metadata to method invocation on the client side.
* @param timeout How long we should wait for a request to be processed by the server.
* @param priority The priority of the request, default per the spec is CS4.
* @param token Token that is used for TAP.
*/
data class CallOptions(
val timeout: Int = TIMEOUT_DEFAULT,
val priority: UPriority = UPriority.UPRIORITY_CS4,
val token: String = ""
) {
companion object {
/**
* Default timeout of 10 seconds (measured in milliseconds).
*/
const val TIMEOUT_DEFAULT: Int = 10000
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.eclipse.uprotocol.transport.*
import org.eclipse.uprotocol.uri.factory.UUriFactory
import org.eclipse.uprotocol.v1.*
import org.eclipse.uprotocol.v1.UUID

/**
* The following is an example implementation of the [RpcClient] interface that
* wraps the [UTransport] for implementing the RPC pattern to send
* RPC requests and receive RPC responses. This implementation uses an in-memory
* map to store the futures that needs to be completed when the response comes in from the server.
*
* *NOTE:* Developers are not required to use these APIs, they can implement their own
* or directly use the [UTransport] to send RPC requests and register listeners that
* handle the RPC responses.
*
* @param transport the transport to use for sending the RPC requests
*/
class InMemoryRpcClient(
private val transport: UTransport,
dispatcher: CoroutineDispatcher = Dispatchers.IO
) : RpcClient {
// Map to store the futures that needs to be completed when the response comes in
private val mRequests = HashMap<UUID, CompletableDeferred<UMessage>>()

// Generic listener to handle all RPC response messages
private val mResponseHandler = UListener { response: UMessage ->
this.handleResponses(response)
}

private val scope = CoroutineScope(SupervisorJob() + dispatcher)

private val mutex = Mutex()

init {
scope.launch {
transport.registerListener(
UUriFactory.ANY,
transport.getSource(), mResponseHandler
)
}
}


/**
* Invoke a method (send an RPC request) and receive the response
* the returned [UPayload] wrapped in [Result].
*
* @param methodUri The method URI to be invoked.
* @param requestPayload The request message to be sent to the server.
* @param options RPC method invocation call options, see [CallOptions]
* @return Returns the [Result] with the response [UPayload] or exception with the failure
* reason as [UStatus].
*/
override suspend fun invokeMethod(
methodUri: UUri,
requestPayload: UPayload,
options: CallOptions
): Result<UPayload> {
try {
val request = uMessage {
forRequest(transport.getSource(), methodUri, options.timeout)
if (options.token.isNotBlank()) {
setToken(options.token)
}
setPayload(requestPayload)
}
transport.send(request).takeIf { it.code != UCode.OK }?.let {
throw UStatusException(it)
}
val result = withTimeout(request.attributes.ttl.toLong()) {
mutex.withLock {
val currentRequest = mRequests[request.attributes.id]
if (currentRequest != null) {
throw UStatusException(UCode.ALREADY_EXISTS, "Duplicated request found")
}
val response = CompletableDeferred<UMessage>()
mRequests[request.attributes.id] = response
response
}.await()
}
return Result.success(UPayload.pack(result.payload, result.attributes.payloadFormat))
} catch (e: Exception) {
return when (e) {
is UStatusException -> {
Result.failure(e)
}

is TimeoutCancellationException -> {
Result.failure(UStatusException(UCode.DEADLINE_EXCEEDED, "Request timed out"))
}

else -> {
Result.failure(UStatusException(UCode.UNKNOWN, e.message))
}
}
}
}

fun close() {
mRequests.clear()
scope.launch {
transport.unregisterListener(UUriFactory.ANY, transport.getSource(), mResponseHandler)
}
}

/**
* Handle the responses coming back from the server
* @param response The response message from the server
*/
private suspend fun handleResponses(response: UMessage) {
// Only handle responses messages, ignore all other messages like notifications
if (response.attributes.type != UMessageType.UMESSAGE_TYPE_RESPONSE) {
return
}
// Check if the response is for a request we made, if not then ignore it
val responseDeferred = mutex.withLock { mRequests.remove(response.attributes.reqid) } ?: return
// Check if the response has a commstatus and if it is not OK then complete the future with an exception
if (response.attributes.hasCommstatus()) {
val code = response.attributes.commstatus
responseDeferred.completeExceptionally(UStatusException(code, "Communication error [$code]"))
} else {
responseDeferred.complete(response)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication

import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.eclipse.uprotocol.transport.*
import org.eclipse.uprotocol.uri.factory.UUriFactory
import org.eclipse.uprotocol.v1.*
import java.util.*

/**
* The following is an example implementation of the [RpcServer] interface that
* wraps the [UTransport] for implementing the server-side of the RPC pattern
* to register handlers for processing RPC requests from clients. This implementation
* uses an in-memory map to store the request handlers that needs to be invoked when the
* request comes in from the client.
*
* *NOTE:* Developers are not required to use these APIs, they can implement their own
* or directly use the [UTransport] to register listeners that handle
* RPC requests and send RPC responses.
*
* @param transport The transport to use for sending the RPC requests.
*/
class InMemoryRpcServer(private val transport: UTransport) : RpcServer {

// Map to store the request handlers, so we can handle the right request on the server side
private val mRequestsHandlers = HashMap<UUri, RequestHandler>()

// Generic listener to handle all RPC request messages
private val mRequestHandler = UListener { message ->
handleRequests(message)
}

private val handlerLock = Mutex()

/**
* Register a handler that will be invoked when requests come in from clients for the given method.
*
* Note: Only one handler is allowed to be registered per method URI.
*
* @param method Uri for the method to register the listener for.
* @param handler The handler that will process the request for the client.
* @return Returns the status of registering the RpcListener.
*/
override suspend fun registerRequestHandler(method: UUri, handler: RequestHandler): UStatus {
// Ensure the method URI matches the transport source URI
if (method.authorityName != transport.getSource().authorityName ||
method.ueId != transport.getSource().ueId ||
method.ueVersionMajor != transport.getSource().ueVersionMajor
) {
return uStatus {
code = UCode.INVALID_ARGUMENT
message = "Method URI does not match the transport source URI"
}
}
return handlerLock.withLock {
mRequestsHandlers[method]?.let {
uStatus {
code = UCode.ALREADY_EXISTS
message = "Handler already registered"
}
} ?: run {
val status: UStatus = transport.registerListener(UUriFactory.ANY, method, mRequestHandler)
if (status.code == UCode.OK) {
mRequestsHandlers[method] = handler
}
status
}
}
}


/**
* Unregister a handler that will be invoked when requests come in from clients for the given method.
*
* @param method Resolved UUri for where the listener was registered to receive messages from.
* @param handler The handler for processing requests
* @return Returns status of registering the RpcListener.
*/
override suspend fun unregisterRequestHandler(method: UUri, handler: RequestHandler): UStatus {
// Ensure the method URI matches the transport source URI
if (method.authorityName != transport.getSource().authorityName ||
method.ueId != transport.getSource().ueId ||
method.ueVersionMajor != transport.getSource().ueVersionMajor
) {
return uStatus {
code = UCode.INVALID_ARGUMENT
message = "Method URI does not match the transport source URI"
}
}

val removeResult = handlerLock.withLock { mRequestsHandlers.remove(method, handler) }
if (removeResult) {
return transport.unregisterListener(UUriFactory.ANY, method, mRequestHandler)
}

return uStatus {
code = UCode.NOT_FOUND
message = "Handler not found"
}
}


/**
* Generic incoming handler to process RPC requests from clients
* @param request The request message from clients
*/
private suspend fun handleRequests(request: UMessage) {
// Only handle request messages, ignore all other messages like notifications
if (request.attributes.type != UMessageType.UMESSAGE_TYPE_REQUEST) {
return
}
// Check if the request is for one that we have registered a handler for, if not ignore it
val handler = mRequestsHandlers[request.attributes.sink] ?: return
transport.send(uMessage {
forResponse(request.attributes)
runCatching{
handler.handleRequest(request)
}.getOrElse { e ->
val code = if (e is UStatusException) {
e.status.code
} else {
UCode.INTERNAL
}
setCommStatus(code)
null
}?.let {
setPayload(it)
}
})
}
}
Loading

0 comments on commit 22335ca

Please sign in to comment.