-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial Commit of uP-L2 Communication Layer APIs #119
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/** | ||
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.eclipse.uprotocol.communication; | ||
|
||
import java.util.Objects; | ||
|
||
import org.eclipse.uprotocol.v1.UPriority; | ||
|
||
/** | ||
* This class is used to pass metadata to method invocation on the client side. | ||
*/ | ||
public record CallOptions (Integer timeout, UPriority priority, String token) { | ||
public static final int TIMEOUT_DEFAULT = 10000; // Default timeout of 10 seconds | ||
|
||
// Default instance. | ||
public static final CallOptions DEFAULT = new CallOptions(TIMEOUT_DEFAULT, UPriority.UPRIORITY_CS4, ""); | ||
|
||
/** | ||
* Check to ensure CallOptions is not null. | ||
*/ | ||
public CallOptions { | ||
Objects.requireNonNull(timeout); | ||
Objects.requireNonNull(priority); | ||
Objects.requireNonNull(token); | ||
} | ||
|
||
/** | ||
* Constructor for CallOptions. | ||
* | ||
* @param timeout The timeout for the method invocation. | ||
* @param priority The priority of the method invocation. | ||
*/ | ||
public CallOptions(Integer timeout, UPriority priority) { | ||
this(timeout, priority, ""); | ||
} | ||
|
||
/** | ||
* Constructor for CallOptions. | ||
* | ||
* @param timeout The timeout for the method invocation. | ||
*/ | ||
public CallOptions(Integer timeout) { | ||
this(timeout, UPriority.UPRIORITY_CS4, ""); | ||
} | ||
|
||
/** | ||
* Constructor for CallOptions. | ||
*/ | ||
public CallOptions() { | ||
this(TIMEOUT_DEFAULT, UPriority.UPRIORITY_CS4, ""); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/** | ||
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.eclipse.uprotocol.communication; | ||
|
||
import java.util.Objects; | ||
|
||
import org.eclipse.uprotocol.transport.UListener; | ||
import org.eclipse.uprotocol.transport.UTransport; | ||
import org.eclipse.uprotocol.transport.builder.UMessageBuilder; | ||
import org.eclipse.uprotocol.v1.UStatus; | ||
import org.eclipse.uprotocol.v1.UUri; | ||
|
||
/** | ||
* Default implementation of the {@link Notifier} API that uses the {@link UTransport} interface. | ||
*/ | ||
public class DefaultNotifier implements Notifier { | ||
// The transport to use for sending the RPC requests | ||
private final UTransport transport; | ||
|
||
/** | ||
* Constructor for the DefaultNotifier. | ||
* | ||
* @param transport the transport to use for sending the notifications | ||
*/ | ||
public DefaultNotifier (UTransport transport) { | ||
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR); | ||
this.transport = transport; | ||
} | ||
|
||
|
||
/** | ||
* Send a notification to a given topic. <br> | ||
* | ||
* @param topic The topic to send the notification to. | ||
* @param destination The destination to send the notification to. | ||
* @param payload The payload to send with the notification. | ||
* @return Returns the {@link UStatus} with the status of the notification. | ||
*/ | ||
@Override | ||
public UStatus notify(UUri topic, UUri destination, UPayload payload) { | ||
UMessageBuilder builder = UMessageBuilder.notification(topic, destination); | ||
return transport.send((payload == null) ? builder.build() : | ||
builder.build(payload)); | ||
} | ||
|
||
|
||
/** | ||
* Register a listener for a notification topic. <br> | ||
* | ||
* @param topic The topic to register the listener to. | ||
* @param listener The listener to be called when a message is received on the topic. | ||
* @return Returns the {@link UStatus} with the status of the listener registration. | ||
*/ | ||
@Override | ||
public UStatus registerNotificationListener(UUri topic, UListener listener) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm, looking at the functions that return UStatus... maybe we could add a map and flatMap to UStatus so we can easily compose functionality. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In RpcResult you mean. That would mean I no longer use proto defined uStatus. could we table this as something we can do later? I agree that building the ustatus right now is clunky. |
||
return transport.registerListener(topic, transport.getSource(), listener); | ||
} | ||
|
||
|
||
/** | ||
* Unregister a listener from a notification topic. <br> | ||
* | ||
* @param topic The topic to unregister the listener from. | ||
* @param listener The listener to be unregistered from the topic. | ||
* @return Returns the {@link UStatus} with the status of the listener that was unregistered. | ||
*/ | ||
@Override | ||
public UStatus unregisterNotificationListener(UUri topic, UListener listener) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another thing with the wrapping is the registering of notifications. If we added a way to say register to notifications that are high priority since they all need to be handled in the same way, I can't. |
||
return transport.unregisterListener(topic, transport.getSource(), listener); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/** | ||
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.eclipse.uprotocol.communication; | ||
|
||
import java.util.Objects; | ||
import org.eclipse.uprotocol.transport.UTransport; | ||
import org.eclipse.uprotocol.transport.builder.UMessageBuilder; | ||
import org.eclipse.uprotocol.v1.UStatus; | ||
import org.eclipse.uprotocol.v1.UUri; | ||
|
||
public class DefaultPublisher implements Publisher { | ||
// The transport to use for sending the RPC requests | ||
private final UTransport transport; | ||
|
||
/** | ||
* Constructor for the DefaultPublisher. | ||
* | ||
* @param transport the transport to use for sending the notifications | ||
*/ | ||
public DefaultPublisher (UTransport transport) { | ||
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR); | ||
this.transport = transport; | ||
} | ||
|
||
/** | ||
* Publish a message to a topic passing {@link UPayload} as the payload. | ||
* | ||
* @param topic The topic to publish to. | ||
* @param payload The {@link UPayload} to publish. | ||
* @return | ||
*/ | ||
@Override | ||
public UStatus publish(UUri topic, UPayload payload) { | ||
Objects.requireNonNull(topic, "Publish topic missing"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as before, you don't need to recheck |
||
UMessageBuilder builder = UMessageBuilder.publish(topic); | ||
|
||
return transport.send(builder.build(payload)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/** | ||
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.eclipse.uprotocol.communication; | ||
|
||
import java.util.Objects; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.eclipse.uprotocol.transport.UListener; | ||
import org.eclipse.uprotocol.transport.UTransport; | ||
import org.eclipse.uprotocol.transport.builder.UMessageBuilder; | ||
import org.eclipse.uprotocol.uri.factory.UriFactory; | ||
import org.eclipse.uprotocol.v1.UAttributes; | ||
import org.eclipse.uprotocol.v1.UCode; | ||
import org.eclipse.uprotocol.v1.UMessage; | ||
import org.eclipse.uprotocol.v1.UMessageType; | ||
import org.eclipse.uprotocol.v1.UStatus; | ||
import org.eclipse.uprotocol.v1.UUID; | ||
import org.eclipse.uprotocol.v1.UUri; | ||
|
||
public class DefaultRpcClient implements RpcClient { | ||
// The transport to use for sending the RPC requests | ||
private final UTransport transport; | ||
|
||
// Map to store the futures that needs to be completed when the response comes in | ||
private final ConcurrentHashMap<UUID, CompletableFuture<UMessage>> mRequests = new ConcurrentHashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be renamed from Default to something else - maybe inMemoryRpcClient or defaultInMemoryRpcClient since this is a very specific implementation that IMHO should be communicated to the L2 user. |
||
|
||
// Generic listener to handle all RPC response messages | ||
private final UListener mResponseHandler = this::handleResponses; | ||
|
||
|
||
/** | ||
* Constructor for the DefaultRpcClient. | ||
* | ||
* @param transport the transport to use for sending the RPC requests | ||
*/ | ||
public DefaultRpcClient (UTransport transport) { | ||
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR); | ||
this.transport = transport; | ||
|
||
UStatus status = transport.registerListener(UriFactory.ANY, transport.getSource(), mResponseHandler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not so good practice to have a constructor or a class with state do these kinds of things, especially if the registerListener has its own lock on its Map of listeners in memory. Personally, I don't like this kind of software coding practice |
||
if (!status.getCode().equals(UCode.OK)) { | ||
throw new UStatusException(status.getCode(), "Failed to register listener"); | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Invoke a method (send an RPC request) and receive the response | ||
* (the returned {@link CompletionStage} {@link UPayload}. <br> | ||
* | ||
* @param methodUri The method URI to be invoked. | ||
* @param requestPayload The request message to be sent to the server. | ||
* @param options RPC method invocation call options, see {@link CallOptions} | ||
* @return Returns the CompletionStage with the response payload or exception with the failure | ||
* reason as {@link UStatus}. | ||
*/ | ||
@Override | ||
public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) { | ||
options = Objects.requireNonNullElse(options, CallOptions.DEFAULT); | ||
UMessageBuilder builder = UMessageBuilder.request(transport.getSource(), methodUri, options.timeout()); | ||
UMessage request; | ||
|
||
try { | ||
if (!options.token().isBlank()) { | ||
builder.withToken(options.token()); | ||
} | ||
|
||
// Build a request uMessage | ||
request = builder.build(requestPayload); | ||
|
||
return mRequests.compute(request.getAttributes().getId(), (requestId, currentRequest) -> { | ||
if (currentRequest != null) { | ||
throw new UStatusException(UCode.ABORTED, "Duplicated request found"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. throwing exceptions is an expensive operation - would it not be better to just return the completed future with an error - like what you have in the catch statement? |
||
} | ||
|
||
final UStatus status = transport.send(request); | ||
if (status.getCode().equals(UCode.OK)) { | ||
final CompletableFuture<UMessage> responseFuture = new CompletableFuture<UMessage>() | ||
.orTimeout(request.getAttributes().getTtl(), TimeUnit.MILLISECONDS); | ||
|
||
responseFuture.whenComplete((responseMessage, exception) -> { | ||
mRequests.remove(request.getAttributes().getId()); | ||
}); | ||
|
||
return responseFuture; | ||
} else { | ||
throw new UStatusException(status); | ||
} | ||
}).thenApply(responseMessage -> { | ||
return UPayload.pack(responseMessage.getPayload(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when I get the UPayload back, how do I know the type of the message inside? |
||
responseMessage.getAttributes().getPayloadFormat()); | ||
}); | ||
} catch (Exception e) { | ||
return CompletableFuture.failedFuture(e); | ||
} | ||
} | ||
|
||
public void close() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If someone forgets to call close then we are left with a transport listening to everything |
||
mRequests.clear(); | ||
transport.unregisterListener(UriFactory.ANY, transport.getSource(), mResponseHandler); | ||
} | ||
|
||
/** | ||
* Handle the responses coming back from the server | ||
* @param response The response message from the server | ||
*/ | ||
private void handleResponses(UMessage response) { | ||
// Only handle responses messages, ignore all other messages like notifications | ||
if (response.getAttributes().getType() != UMessageType.UMESSAGE_TYPE_RESPONSE) { | ||
return; | ||
} | ||
|
||
final UAttributes responseAttributes = response.getAttributes(); | ||
|
||
// Check if the response is for a request we made, if not then ignore it | ||
final CompletableFuture<UMessage> responseFuture = mRequests.remove(responseAttributes.getReqid()); | ||
if (responseFuture == null) { | ||
return; | ||
} | ||
|
||
// Check if the response has a commstatus and if it is not OK then complete the future with an exception | ||
if (responseAttributes.hasCommstatus()) { | ||
final UCode code = responseAttributes.getCommstatus(); | ||
responseFuture.completeExceptionally( | ||
new UStatusException(code, "Communication error [" + code + "]")); | ||
return; | ||
} | ||
responseFuture.complete(response); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when working with records it is custom to create static factory methods instead of constructors
just makes it nicer to use
Another thing are the exceptions barfed in the record validation making this difficult to use in a functional way - everything now needs to handle these exceptions.
I guess it is just me not liking code with exceptions and null pointer exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tamarafischer is this a must change? what would the static methods look like?