Skip to content

Commit

Permalink
InMemorySubscriber Functionality (#135)
Browse files Browse the repository at this point in the history
The following PR completes the functionality for the client-side of uSubscription service communication meaning to be able to subscribe and unsubscribe to topics. We handle the various subscription states and subscription change notifications now.
I've also closed the gap for code coverage using mockito so we now are at 100% coverage.

#129
#132
  • Loading branch information
Steven Hartley authored Jul 5, 2024
1 parent d485ced commit a3dc39f
Show file tree
Hide file tree
Showing 24 changed files with 1,146 additions and 515 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ build/
### VS Code ###
.vscode/
logging/
.VSCodeCounter

.metals
.bloop
Expand Down
36 changes: 23 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,27 @@
<version>2.4.2</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>

<!-- Mockito JUnit Jupiter extension -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.11.0</version>
</dependency>


<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
Expand Down Expand Up @@ -364,14 +377,6 @@
<scope>test</scope>
</dependency>

<!--JUnit Jupiter Engine to depend on the JUnit5 engine and JUnit 5 API -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>

Expand All @@ -380,14 +385,19 @@
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<version>3.14.1</version>
<scope>test</scope>
</dependency>

<!--JUnit Jupiter Engine to depend on the JUnit5 engine and JUnit 5 API -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Mockito JUnit Jupiter extension -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CompletionException;

import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
Expand Down Expand Up @@ -83,54 +80,39 @@ public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPa
UMessageBuilder builder = UMessageBuilder.request(transport.getSource(), methodUri, options.timeout());
UMessage request;

try {
if (!options.token().isBlank()) {
builder.withToken(options.token());
}
// Build a request uMessage
request = builder.build(requestPayload);

// Create the response future and store it in mRequests
CompletableFuture<UMessage> responseFuture = new CompletableFuture<UMessage>()
.orTimeout(request.getAttributes().getTtl(), TimeUnit.MILLISECONDS)
.handle((responseMessage, exception) -> {
if (exception != null) {
if (exception instanceof CompletionException) exception = exception.getCause();
if (exception instanceof TimeoutException) {
throw new UStatusException(UCode.DEADLINE_EXCEEDED, "Request timed out");
} else if (exception instanceof UStatusException) {
throw new UStatusException(((UStatusException) exception).getStatus());
} else {
throw new UStatusException(UCode.UNKNOWN, exception.getMessage());
}
}
return responseMessage;
});

responseFuture.whenComplete(
(responseMessage, exception) -> mRequests.remove(request.getAttributes().getId()));

mRequests.compute(request.getAttributes().getId(), (requestId, currentRequest) -> {
if (currentRequest != null)
throw new UStatusException(UCode.ALREADY_EXISTS, "Duplicated request found");
return responseFuture;
});

// Send the request
CompletionStage<UStatus> status = transport.send(request);

return status.thenApply(s -> {
if (s.getCode() != UCode.OK) throw new UStatusException(s);
return s;
}).thenCompose(s -> responseFuture.thenApply(responseMessage ->
UPayload.pack(responseMessage.getPayload(), responseMessage.getAttributes().getPayloadFormat())
));

} catch (Exception e) {
return CompletableFuture.failedFuture(e);
if (!options.token().isBlank()) {
builder.withToken(options.token());
}
// Build a request uMessage
request = builder.build(requestPayload);

// Create the response future and store it in mRequests
CompletableFuture<UMessage> responseFuture = new CompletableFuture<UMessage>()
.orTimeout(request.getAttributes().getTtl(), TimeUnit.MILLISECONDS)
.exceptionally(e -> {
throw new UStatusException(UCode.DEADLINE_EXCEEDED, "Request timed out");
})
.whenComplete((responseMessage, exception) -> mRequests.remove(request.getAttributes().getId()));

mRequests.compute(request.getAttributes().getId(), (requestId, currentRequest) -> {
return responseFuture;
});

// Send the request
CompletionStage<UStatus> status = transport.send(request);

return status.thenApply(s -> {
if (s.getCode() != UCode.OK) throw new UStatusException(s);
return s;
}).thenCompose(s -> responseFuture.thenApply(responseMessage ->
UPayload.pack(responseMessage.getPayload(), responseMessage.getAttributes().getPayloadFormat())
));
}


/**
* Close the RPC client and clean up any resources
*/
public void close() {
mRequests.clear();
transport.unregisterListener(UriFactory.ANY, transport.getSource(), mResponseHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,8 @@ public CompletionStage<UStatus> registerRequestHandler(UUri method, RequestHandl
return handler;
});
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
} catch (Exception e) {
if (e instanceof UStatusException statusException) {
return CompletableFuture.completedFuture(statusException.getStatus());
}
return CompletableFuture.completedFuture(
UStatus.newBuilder().setCode(UCode.INTERNAL).setMessage(e.getMessage()).build());
} catch (UStatusException e) {
return CompletableFuture.completedFuture(e.getStatus());
}
}

Expand Down
Loading

0 comments on commit a3dc39f

Please sign in to comment.