Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Commit of uP-L2 Communication Layer APIs #119

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/main/java/org/eclipse/uprotocol/communication/CallOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;

import org.eclipse.uprotocol.v1.UPriority;

/**
* This class is used to pass metadata to method invocation on the client side.
*/
public record CallOptions (Integer timeout, UPriority priority, String token) {
public static final int TIMEOUT_DEFAULT = 10000; // Default timeout of 10 seconds

// Default instance.
public static final CallOptions DEFAULT = new CallOptions(TIMEOUT_DEFAULT, UPriority.UPRIORITY_CS4, "");

/**
* Check to ensure CallOptions is not null.
*/
public CallOptions {
Objects.requireNonNull(timeout);
Objects.requireNonNull(priority);
Objects.requireNonNull(token);
}

/**
* Constructor for CallOptions.
*
* @param timeout The timeout for the method invocation.
* @param priority The priority of the method invocation.
*/
public CallOptions(Integer timeout, UPriority priority) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when working with records it is custom to create static factory methods instead of constructors
just makes it nicer to use
Another thing are the exceptions barfed in the record validation making this difficult to use in a functional way - everything now needs to handle these exceptions.

I guess it is just me not liking code with exceptions and null pointer exceptions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tamarafischer is this a must change? what would the static methods look like?

this(timeout, priority, "");
}

/**
* Constructor for CallOptions.
*
* @param timeout The timeout for the method invocation.
*/
public CallOptions(Integer timeout) {
this(timeout, UPriority.UPRIORITY_CS4, "");
}

/**
* Constructor for CallOptions.
*/
public CallOptions() {
this(TIMEOUT_DEFAULT, UPriority.UPRIORITY_CS4, "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;

import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUri;

/**
* Default implementation of the {@link Notifier} API that uses the {@link UTransport} interface.
*/
public class DefaultNotifier implements Notifier {
// The transport to use for sending the RPC requests
private final UTransport transport;

/**
* Constructor for the DefaultNotifier.
*
* @param transport the transport to use for sending the notifications
*/
public DefaultNotifier (UTransport transport) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
this.transport = transport;
}


/**
* Send a notification to a given topic. <br>
*
* @param topic The topic to send the notification to.
* @param destination The destination to send the notification to.
* @param payload The payload to send with the notification.
* @return Returns the {@link UStatus} with the status of the notification.
*/
@Override
public UStatus notify(UUri topic, UUri destination, UPayload payload) {
UMessageBuilder builder = UMessageBuilder.notification(topic, destination);
return transport.send((payload == null) ? builder.build() :
builder.build(payload));
}


/**
* Register a listener for a notification topic. <br>
*
* @param topic The topic to register the listener to.
* @param listener The listener to be called when a message is received on the topic.
* @return Returns the {@link UStatus} with the status of the listener registration.
*/
@Override
public UStatus registerNotificationListener(UUri topic, UListener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, looking at the functions that return UStatus... maybe we could add a map and flatMap to UStatus so we can easily compose functionality.
There is an example in the RpcMapper class

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In RpcResult you mean. That would mean I no longer use proto defined uStatus. could we table this as something we can do later? I agree that building the ustatus right now is clunky.

return transport.registerListener(topic, transport.getSource(), listener);
}


/**
* Unregister a listener from a notification topic. <br>
*
* @param topic The topic to unregister the listener from.
* @param listener The listener to be unregistered from the topic.
* @return Returns the {@link UStatus} with the status of the listener that was unregistered.
*/
@Override
public UStatus unregisterNotificationListener(UUri topic, UListener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another thing with the wrapping is the registering of notifications. If we added a way to say register to notifications that are high priority since they all need to be handled in the same way, I can't.
I could add these enhancements to uTransport but then would have to go and change all the wrapper implementations or developers would need to work directly with uTransport.
Another downside of this wrapping is the inability to enhance uTransport easily - all wrappers need to be changed as well

return transport.unregisterListener(topic, transport.getSource(), listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUri;

public class DefaultPublisher implements Publisher {
// The transport to use for sending the RPC requests
private final UTransport transport;

/**
* Constructor for the DefaultPublisher.
*
* @param transport the transport to use for sending the notifications
*/
public DefaultPublisher (UTransport transport) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
this.transport = transport;
}

/**
* Publish a message to a topic passing {@link UPayload} as the payload.
*
* @param topic The topic to publish to.
* @param payload The {@link UPayload} to publish.
* @return
*/
@Override
public UStatus publish(UUri topic, UPayload payload) {
Objects.requireNonNull(topic, "Publish topic missing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as before, you don't need to recheck

UMessageBuilder builder = UMessageBuilder.publish(topic);

return transport.send(builder.build(payload));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.uri.factory.UriFactory;
import org.eclipse.uprotocol.v1.UAttributes;
import org.eclipse.uprotocol.v1.UCode;
import org.eclipse.uprotocol.v1.UMessage;
import org.eclipse.uprotocol.v1.UMessageType;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUID;
import org.eclipse.uprotocol.v1.UUri;

public class DefaultRpcClient implements RpcClient {
// The transport to use for sending the RPC requests
private final UTransport transport;

// Map to store the futures that needs to be completed when the response comes in
private final ConcurrentHashMap<UUID, CompletableFuture<UMessage>> mRequests = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be renamed from Default to something else - maybe inMemoryRpcClient or defaultInMemoryRpcClient since this is a very specific implementation that IMHO should be communicated to the L2 user.


// Generic listener to handle all RPC response messages
private final UListener mResponseHandler = this::handleResponses;


/**
* Constructor for the DefaultRpcClient.
*
* @param transport the transport to use for sending the RPC requests
*/
public DefaultRpcClient (UTransport transport) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
this.transport = transport;

UStatus status = transport.registerListener(UriFactory.ANY, transport.getSource(), mResponseHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not so good practice to have a constructor or a class with state do these kinds of things, especially if the registerListener has its own lock on its Map of listeners in memory.
You could end up waiting and having an exception thrown on the class creation when it has nothing to do with creating the class but more to initializing the class.

Personally, I don't like this kind of software coding practice

if (!status.getCode().equals(UCode.OK)) {
throw new UStatusException(status.getCode(), "Failed to register listener");
}
}


/**
* Invoke a method (send an RPC request) and receive the response
* (the returned {@link CompletionStage} {@link UPayload}. <br>
*
* @param methodUri The method URI to be invoked.
* @param requestPayload The request message to be sent to the server.
* @param options RPC method invocation call options, see {@link CallOptions}
* @return Returns the CompletionStage with the response payload or exception with the failure
* reason as {@link UStatus}.
*/
@Override
public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) {
options = Objects.requireNonNullElse(options, CallOptions.DEFAULT);
UMessageBuilder builder = UMessageBuilder.request(transport.getSource(), methodUri, options.timeout());
UMessage request;

try {
if (!options.token().isBlank()) {
builder.withToken(options.token());
}

// Build a request uMessage
request = builder.build(requestPayload);

return mRequests.compute(request.getAttributes().getId(), (requestId, currentRequest) -> {
if (currentRequest != null) {
throw new UStatusException(UCode.ABORTED, "Duplicated request found");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwing exceptions is an expensive operation - would it not be better to just return the completed future with an error - like what you have in the catch statement?

}

final UStatus status = transport.send(request);
if (status.getCode().equals(UCode.OK)) {
final CompletableFuture<UMessage> responseFuture = new CompletableFuture<UMessage>()
.orTimeout(request.getAttributes().getTtl(), TimeUnit.MILLISECONDS);

responseFuture.whenComplete((responseMessage, exception) -> {
mRequests.remove(request.getAttributes().getId());
});

return responseFuture;
} else {
throw new UStatusException(status);
}
}).thenApply(responseMessage -> {
return UPayload.pack(responseMessage.getPayload(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I get the UPayload back, how do I know the type of the message inside?
If it is an Any, I should be ok, but if not, how do I unpack this to the correct protobuf Message and start working with the response in a nice way

responseMessage.getAttributes().getPayloadFormat());
});
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone forgets to call close then we are left with a transport listening to everything
In addition, what if the unregisterListener task failed for some reason?
can it fail? Does the developer know it can fail?
When there are problems, this is going to be a nightmare to find exactly what part of the code failed and why I am receiving all these messages and not processing them and where they came from.
As a developer, you are hiding things and leaving it for me to debug later where I will have to reverse engineer this code

mRequests.clear();
transport.unregisterListener(UriFactory.ANY, transport.getSource(), mResponseHandler);
}

/**
* Handle the responses coming back from the server
* @param response The response message from the server
*/
private void handleResponses(UMessage response) {
// Only handle responses messages, ignore all other messages like notifications
if (response.getAttributes().getType() != UMessageType.UMESSAGE_TYPE_RESPONSE) {
return;
}

final UAttributes responseAttributes = response.getAttributes();

// Check if the response is for a request we made, if not then ignore it
final CompletableFuture<UMessage> responseFuture = mRequests.remove(responseAttributes.getReqid());
if (responseFuture == null) {
return;
}

// Check if the response has a commstatus and if it is not OK then complete the future with an exception
if (responseAttributes.hasCommstatus()) {
final UCode code = responseAttributes.getCommstatus();
responseFuture.completeExceptionally(
new UStatusException(code, "Communication error [" + code + "]"));
return;
}
responseFuture.complete(response);
}
}
Loading
Loading