Skip to content

Commit

Permalink
Incorporate the updates to UPayload
Browse files Browse the repository at this point in the history
  • Loading branch information
czfdcn committed May 6, 2024
1 parent 21f825c commit b6ecc43
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ static UMessage toMessage(CloudEvent event) {
Objects.requireNonNull(event);

UPayload payload = UPayload.newBuilder().setFormat(getUPayloadFormatFromContentType(event.getDataContentType()))
.setValue(getPayload(event).toByteString()).build();
.setData(getPayload(event).toByteString()).build();

UAttributes.Builder builder =
UAttributes.newBuilder()
Expand Down Expand Up @@ -448,9 +448,9 @@ static CloudEvent fromMessage(UMessage message) {
if(!contentType.isEmpty()){
cloudEventBuilder.withDataContentType(contentType);
}
// IMPORTANT: Currently, ONLY the VALUE format is supported in the SDK!
if (payload.hasValue())
cloudEventBuilder.withData(payload.getValue().toByteArray());

if (!payload.getData().isEmpty())
cloudEventBuilder.withData(payload.getData().toByteArray());

if (attributes.hasTtl())
cloudEventBuilder.withExtension("ttl",attributes.getTtl());
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/eclipse/uprotocol/rpc/RpcMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static <T extends Message> CompletionStage<T> mapResponse(CompletionStage<UMessa
}
Any any;
try {
any = Any.parseFrom(message.getPayload().getValue());
any = Any.parseFrom(message.getPayload().getData());

// Expected type
if (any.is(expectedClazz)) {
Expand Down Expand Up @@ -95,7 +95,7 @@ static <T extends Message> CompletionStage<RpcResult<T>> mapResponseToResult(

Any any;
try {
any = Any.parseFrom(message.getPayload().getValue());
any = Any.parseFrom(message.getPayload().getData());

// Expected type
if (any.is(expectedClazz)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface UPayloadBuilder {
static UPayload packToAny(Message message) {
return UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)
.setValue(Any.pack(message).toByteString())
.setData(Any.pack(message).toByteString())
.build();
}

Expand All @@ -58,7 +58,7 @@ static UPayload packToAny(Message message) {
static UPayload pack(Message message) {
return UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF)
.setValue(message.toByteString())
.setData(message.toByteString())
.build();
}

Expand All @@ -72,18 +72,18 @@ static UPayload pack(Message message) {
*/
@SuppressWarnings("unchecked")
static <T extends Message> Optional<T> unpack(UPayload payload, Class<T> clazz) {
if (payload == null || !payload.hasValue()) {
if (payload == null || payload.getData().isEmpty()) {
return Optional.empty();
}
try {
switch (payload.getFormat()) {
case UPAYLOAD_FORMAT_UNSPECIFIED: // Default is WRAPPED_IN_ANY
case UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY :
return Optional.of(Any.parseFrom(payload.getValue()).unpack(clazz));
return Optional.of(Any.parseFrom(payload.getData()).unpack(clazz));

case UPAYLOAD_FORMAT_PROTOBUF:
T defaultInstance = com.google.protobuf.Internal.getDefaultInstance(clazz);
return Optional.of((T)defaultInstance.getParserForType().parseFrom(payload.getValue()));
return Optional.of((T)defaultInstance.getParserForType().parseFrom(payload.getData()));

default:
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ public void test_to_from_message_from_request_cloudevent() {
assertTrue(UCloudEvent.getSink(cloudEvent).isPresent());
assertEquals(UCloudEvent.getSink(cloudEvent).get(),
UriSerializer.serialize(result.getAttributes().getSink()));
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getValue());
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getData());
assertEquals(UCloudEvent.getSource(cloudEvent),UriSerializer.serialize(result.getAttributes().getSource()));
assertTrue(UCloudEvent.getPriority(cloudEvent).isPresent());
assertEquals(UCloudEvent.getPriority(cloudEvent).get(), UCloudEvent.getCePriority(result.getAttributes().getPriority()));
Expand All @@ -796,7 +796,7 @@ public void test_to_from_message_from_request_cloudevent_without_attributes() {
assertTrue(UCloudEvent.getSink(cloudEvent).isPresent());
assertEquals(UCloudEvent.getSink(cloudEvent).get(),
UriSerializer.serialize(result.getAttributes().getSink()));
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getValue());
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getData());
assertEquals(UCloudEvent.getSource(cloudEvent),UriSerializer.serialize(result.getAttributes().getSource()));
assertEquals(result.getAttributes().getPriority().getNumber(),0);

Expand Down Expand Up @@ -828,7 +828,7 @@ public void test_to_from_message_from_response_cloudevent() {
assertTrue(UCloudEvent.getSink(cloudEvent).isPresent());
assertEquals(UCloudEvent.getSink(cloudEvent).get(),
UriSerializer.serialize(result.getAttributes().getSink()));
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getValue());
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getData());
assertEquals(UCloudEvent.getSource(cloudEvent),UriSerializer.serialize(result.getAttributes().getSource()));
assertTrue(UCloudEvent.getPriority(cloudEvent).isPresent());
assertEquals(UCloudEvent.getPriority(cloudEvent).get(), UCloudEvent.getCePriority(result.getAttributes().getPriority()));
Expand Down
20 changes: 10 additions & 10 deletions src/test/java/org/eclipse/uprotocol/rpc/RpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RpcTest {
public CompletionStage<UMessage> invokeMethod(UUri topic, UPayload payload, CallOptions options) {
UPayload data = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)
.setValue(Any.pack(Int32Value.of(3)).toByteString())
.setData(Any.pack(Int32Value.of(3)).toByteString())
.build();
return CompletableFuture.completedFuture(UMessage.newBuilder().setPayload(data).build());
}
Expand All @@ -69,7 +69,7 @@ public CompletionStage<UMessage> invokeMethod(UUri topic, UPayload payload, Call
Any any = Any.pack(status);
UPayload data = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)
.setValue(any.toByteString())
.setData(any.toByteString())
.build();
return CompletableFuture.completedFuture(UMessage.newBuilder().setPayload(data).build());
}
Expand All @@ -82,7 +82,7 @@ public CompletionStage<UMessage> invokeMethod(UUri topic, UPayload payload, Call
Any any = Any.pack(status);
UPayload data = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)
.setValue(any.toByteString())
.setData(any.toByteString())
.build();
return CompletableFuture.completedFuture(UMessage.newBuilder().setPayload(data).build());
}
Expand All @@ -93,7 +93,7 @@ public CompletionStage<UMessage> invokeMethod(UUri topic, UPayload payload, Call
public CompletionStage<UMessage> invokeMethod(UUri topic, UPayload payload, CallOptions options) {
UPayload response = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_RAW)
.setValue(ByteString.copyFrom(new byte[]{0}))
.setData(ByteString.copyFrom(new byte[]{0}))
.build();
return CompletableFuture.completedFuture(UMessage.newBuilder().setPayload(response).build());
}
Expand All @@ -114,7 +114,7 @@ public CompletionStage<UMessage> invokeMethod(UUri topic, UPayload payload, Call
Any any = Any.pack(Int32Value.of(42));
UPayload data = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)
.setValue(any.toByteString())
.setData(any.toByteString())
.build();
return CompletableFuture.completedFuture(UMessage.newBuilder().setPayload(data).build());
}
Expand Down Expand Up @@ -145,7 +145,7 @@ private static UPayload buildUPayload() {
Any any = Any.pack(buildCloudEvent());
return UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)
.setValue(any.toByteString())
.setData(any.toByteString())
.build();
}

Expand All @@ -167,7 +167,7 @@ private static CompletionStage<io.cloudevents.v1.proto.CloudEvent> rpcResponse(
(message, exception) -> {
Any any;
try {
any = Any.parseFrom(message.getPayload().getValue());
any = Any.parseFrom(message.getPayload().getData());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e.getMessage(), e);
}
Expand Down Expand Up @@ -405,7 +405,7 @@ void test_success_invoke_method_happy_flow() {
assertFalse(true);

try {
any = Any.parseFrom(message.getPayload().getValue());
any = Any.parseFrom(message.getPayload().getData());
// happy flow, no exception
assertNull(exception);

Expand Down Expand Up @@ -435,7 +435,7 @@ void test_fail_invoke_method_when_invoke_method_returns_a_status() {
final CompletionStage<io.cloudevents.v1.proto.CloudEvent> stubReturnValue = rpcResponse.handle(
(message, exception) -> {
try {
Any any = Any.parseFrom(message.getPayload().getValue());
Any any = Any.parseFrom(message.getPayload().getData());
// happy flow, no exception
assertNull(exception);

Expand Down Expand Up @@ -498,7 +498,7 @@ void test_fail_invoke_method_when_invoke_method_returns_a_bad_proto() {
final CompletionStage<io.cloudevents.v1.proto.CloudEvent> stubReturnValue = rpcResponse.handle(
(message, exception) -> {
try {
Any any = Any.parseFrom(message.getPayload().getValue());
Any any = Any.parseFrom(message.getPayload().getData());
// happy flow, no exception
assertNull(exception);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void test_pack_to_any() {
UPayload payload = UPayloadBuilder.packToAny(message);
assertTrue(payload.getFormat() == UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
try {
Any any = Any.parseFrom(payload.getValue());
Any any = Any.parseFrom(payload.getData());
CloudEvent unpacked = any.unpack(CloudEvent.class);
assertEquals(message, unpacked);
} catch (Exception e) {
Expand All @@ -48,7 +48,7 @@ public void test_pack() {
UPayload payload = UPayloadBuilder.pack(message);
assertTrue(payload.getFormat() == UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF);
try {
CloudEvent unpacked = CloudEvent.parseFrom(payload.getValue());
CloudEvent unpacked = CloudEvent.parseFrom(payload.getData());
assertEquals(message, unpacked);
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -65,7 +65,7 @@ public void test_unpack() {
.build();
UPayload payload = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF)
.setValue(message.toByteString())
.setData(message.toByteString())
.build();
Optional<CloudEvent> unpacked = UPayloadBuilder.unpack(payload, CloudEvent.class);
assertTrue(unpacked.isPresent());
Expand Down Expand Up @@ -97,7 +97,7 @@ public void test_unpack_payload_without_format() {
.setType("myType")
.build();
UPayload payload = UPayload.newBuilder()
.setValue(Any.pack(message).toByteString())
.setData(Any.pack(message).toByteString())
.build();
Optional<CloudEvent> unpacked = UPayloadBuilder.unpack(payload, CloudEvent.class);
assertEquals(payload.getFormat(), UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED);
Expand Down Expand Up @@ -128,7 +128,7 @@ public void test_unpack_payload_without_unsupported_format() {
.build();
UPayload payload = UPayload.newBuilder()
.setFormat(UPayloadFormat.UPAYLOAD_FORMAT_SOMEIP)
.setValue(Any.pack(message).toByteString())
.setData(Any.pack(message).toByteString())
.build();
Optional<CloudEvent> unpacked = UPayloadBuilder.unpack(payload, CloudEvent.class);
assertTrue(unpacked.isEmpty());
Expand Down

0 comments on commit b6ecc43

Please sign in to comment.