Skip to content

Commit

Permalink
Initial Commit of uP-L2 Communication Layer APIs
Browse files Browse the repository at this point in the history
The following commit includes a proposed common L2 API interfaces that is implemented as static methods that use uTransport (stateless).
  • Loading branch information
czfdcn committed Jun 17, 2024
1 parent fd88345 commit 8c564fe
Show file tree
Hide file tree
Showing 42 changed files with 4,349 additions and 59 deletions.
62 changes: 62 additions & 0 deletions src/main/java/org/eclipse/uprotocol/communication/CallOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;

import org.eclipse.uprotocol.v1.UPriority;

/**
* This class is used to pass metadata to method invocation on the client side.
*/
public record CallOptions (Integer timeout, UPriority priority, String token) {
public static final int TIMEOUT_DEFAULT = 10000; // Default timeout of 10 seconds

// Default instance.
public static final CallOptions DEFAULT = new CallOptions(TIMEOUT_DEFAULT, UPriority.UPRIORITY_CS4, "");

/**
* Check to ensure CallOptions is not null.
*/
public CallOptions {
Objects.requireNonNull(timeout);
Objects.requireNonNull(priority);
Objects.requireNonNull(token);
}

/**
* Constructor for CallOptions.
*
* @param timeout The timeout for the method invocation.
* @param priority The priority of the method invocation.
*/
public CallOptions(Integer timeout, UPriority priority) {
this(timeout, priority, "");
}

/**
* Constructor for CallOptions.
*
* @param timeout The timeout for the method invocation.
*/
public CallOptions(Integer timeout) {
this(timeout, UPriority.UPRIORITY_CS4, "");
}

/**
* Constructor for CallOptions.
*/
public CallOptions() {
this(TIMEOUT_DEFAULT, UPriority.UPRIORITY_CS4, "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;

import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUri;

/**
* Default implementation of the {@link Notifier} API that uses the {@link UTransport} interface.
*/
public class DefaultNotifier implements Notifier {
// The transport to use for sending the RPC requests
private UTransport transport;

/**
* Constructor for the DefaultNotifier.
*
* @param transport the transport to use for sending the notifications
*/
protected DefaultNotifier (UTransport transport) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
this.transport = transport;
}


/**
* Send a notification to a given topic. <br>
*
* @param topic The topic to send the notification to.
* @param destination The destination to send the notification to.
* @param payload The payload to send with the notification.
* @return Returns the {@link UStatus} with the status of the notification.
*/
@Override
public UStatus notify(UUri topic, UUri destination, UPayload payload) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
UMessageBuilder builder = UMessageBuilder.notification(topic, destination);
return transport.send((payload == null) ? builder.build() :
builder.build(payload));
}


/**
* Register a listener for a notification topic. <br>
*
* @param topic The topic to register the listener to.
* @param listener The listener to be called when a message is received on the topic.
* @return Returns the {@link UStatus} with the status of the listener registration.
*/
@Override
public UStatus registerNotificationListener(UUri topic, UListener listener) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
return transport.registerListener(topic, transport.getSource(), listener);
}


/**
* Unregister a listener from a notification topic. <br>
*
* @param topic The topic to unregister the listener from.
* @param listener The listener to be unregistered from the topic.
* @return Returns the {@link UStatus} with the status of the listener that was unregistered.
*/
@Override
public UStatus unregisterNotificationListener(UUri topic, UListener listener) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
return transport.unregisterListener(topic, transport.getSource(), listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUri;

public class DefaultPublisher implements Publisher {
// The transport to use for sending the RPC requests
private UTransport transport;

/**
* Constructor for the DefaultPublisher.
*
* @param transport the transport to use for sending the notifications
*/
public DefaultPublisher (UTransport transport) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
this.transport = transport;
}

/**
* Publish a message to a topic passing {@link UPayload} as the payload.
*
* @param topic The topic to publish to.
* @param payload The {@link UPayload} to publish.
* @return
*/
@Override
public UStatus publish(UUri topic, UPayload payload) {
Objects.requireNonNull(topic, "Publish topic missing");
UMessageBuilder builder = UMessageBuilder.publish(topic);

return transport.send(builder.build(payload));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.uri.factory.UriFactory;
import org.eclipse.uprotocol.v1.UAttributes;
import org.eclipse.uprotocol.v1.UCode;
import org.eclipse.uprotocol.v1.UMessage;
import org.eclipse.uprotocol.v1.UMessageType;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUID;
import org.eclipse.uprotocol.v1.UUri;

public class DefaultRpcClient implements RpcClient {
// The transport to use for sending the RPC requests
private UTransport transport;

// Map to store the futures that needs to be completed when the response comes in
private final ConcurrentHashMap<UUID, CompletableFuture<UMessage>> mRequests = new ConcurrentHashMap<>();

// Generic listener to handle all RPC response messages
private final UListener mResponseHandler = this::handleResponses;


/**
* Constructor for the DefaultRpcClient.
*
* @param transport the transport to use for sending the RPC requests
*/
public DefaultRpcClient (UTransport transport) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
this.transport = transport;

UStatus status = transport.registerListener(UriFactory.ANY, transport.getSource(), mResponseHandler);
if (!status.getCode().equals(UCode.OK)) {
throw new UStatusException(status.getCode(), "Failed to register listener");
}
}


/**
* Invoke a method (send an RPC request) and receive the response
* (the returned {@link CompletionStage} {@link UPayload}. <br>
*
* @param methodUri The method URI to be invoked.
* @param requestPayload The request message to be sent to the server.
* @param options RPC method invocation call options, see {@link CallOptions}
* @return Returns the CompletionStage with the response payload or exception with the failure
* reason as {@link UStatus}.
*/
@Override
public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) {
options = Objects.requireNonNullElse(options, CallOptions.DEFAULT);
UMessageBuilder builder = UMessageBuilder.request(transport.getSource(), methodUri, options.timeout());
UMessage request;

try {
if (!options.token().isBlank()) {
builder.withToken(options.token());
}

// Build a request uMessage
request = builder.build(requestPayload);

return mRequests.compute(request.getAttributes().getId(), (requestId, currentRequest) -> {
if (currentRequest != null) {
throw new UStatusException(UCode.ABORTED, "Duplicated request found");
}

final UStatus status = transport.send(request);
if (status.getCode().equals(UCode.OK)) {
final CompletableFuture<UMessage> responseFuture = new CompletableFuture<UMessage>()
.orTimeout(request.getAttributes().getTtl(), TimeUnit.MILLISECONDS);

responseFuture.whenComplete((responseMessage, exception) -> {
mRequests.remove(request.getAttributes().getId());
});

return responseFuture;
} else {
throw new UStatusException(status);
}
}).thenApply(responseMessage -> {
return UPayload.pack(responseMessage.getPayload(),
responseMessage.getAttributes().getPayloadFormat());
});
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

public void close() {
mRequests.clear();
transport.unregisterListener(UriFactory.ANY, transport.getSource(), mResponseHandler);
}

/**
* Handle the responses coming back from the server
* @param response The response message from the server
*/
private void handleResponses(UMessage response) {
// Only handle responses messages, ignore all other messages like notifications
if (response.getAttributes().getType() != UMessageType.UMESSAGE_TYPE_RESPONSE) {
return;
}

final UAttributes responseAttributes = response.getAttributes();

// Check if the response is for a request we made, if not then ignore it
final CompletableFuture<UMessage> responseFuture = mRequests.remove(responseAttributes.getReqid());
if (responseFuture == null) {
return;
}

// Check if the response has a commstatus and if it is not OK then complete the future with an exception
if (responseAttributes.hasCommstatus()) {
final UCode code = responseAttributes.getCommstatus();
responseFuture.completeExceptionally(
new UStatusException(code, "Communication error [" + code + "]"));
return;
}
responseFuture.complete(response);
}
}
Loading

0 comments on commit 8c564fe

Please sign in to comment.