From ac112fe58966b6293f052f3f2a237cdd337b1b69 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Wed, 29 May 2024 07:03:44 +0530 Subject: [PATCH] Add Zenoh publish/subscribe and RPC examples (#4) * Add Zenoh publish/subscribe and RPC examples * Removed time sleep * Add zenoh default config method --- up_client_zenoh/examples/common_uuri.py | 76 +++++++++++++++++++++++++ up_client_zenoh/examples/publish.py | 47 +++++++++++++++ up_client_zenoh/examples/rpc_client.py | 46 +++++++++++++++ up_client_zenoh/examples/rpc_server.py | 56 ++++++++++++++++++ up_client_zenoh/examples/subscribe.py | 49 ++++++++++++++++ 5 files changed, 274 insertions(+) create mode 100644 up_client_zenoh/examples/common_uuri.py create mode 100644 up_client_zenoh/examples/publish.py create mode 100644 up_client_zenoh/examples/rpc_client.py create mode 100644 up_client_zenoh/examples/rpc_server.py create mode 100644 up_client_zenoh/examples/subscribe.py diff --git a/up_client_zenoh/examples/common_uuri.py b/up_client_zenoh/examples/common_uuri.py new file mode 100644 index 0000000..f6fabe4 --- /dev/null +++ b/up_client_zenoh/examples/common_uuri.py @@ -0,0 +1,76 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import json +import logging +from enum import Enum + +import zenoh +from uprotocol.proto.uri_pb2 import UAuthority +from uprotocol.proto.uri_pb2 import UEntity +from uprotocol.proto.uri_pb2 import UResource +from uprotocol.uri.factory.uresource_builder import UResourceBuilder + +# Configure the logging +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + + +class ExampleType(Enum): + PUBLISHER = "publisher" + SUBSCRIBER = "subscriber" + RPC_SERVER = "rpc_server" + RPC_CLIENT = "rpc_client" + + +def authority() -> UAuthority: + return UAuthority(name="auth_name", id=bytes([1, 2, 3, 4])) + + +def entity(example_type: ExampleType) -> UEntity: + mapping = {ExampleType.PUBLISHER : ("publisher", 1), ExampleType.SUBSCRIBER: ("subscriber", 2), + ExampleType.RPC_SERVER: ("rpc_server", 3), ExampleType.RPC_CLIENT: ("rpc_client", 4)} + name, id = mapping[example_type] + return UEntity(name=name, id=1, version_major=id) + + +def pub_resource() -> UResource: + return UResource(name="door", instance="front_left", message="Door", id=5678) + + +def rpc_resource() -> UResource: + return UResourceBuilder.for_rpc_request("getTime", 5678) + + +def get_zenoh_config(): + # start your zenoh router and provide router ip and port + zenoh_ip = "192.168.29.79" # zenoh router ip + zenoh_port = 9090 # zenoh router port + conf = zenoh.Config() + if zenoh_ip is not None: + endpoint = [f"tcp/{zenoh_ip}:{zenoh_port}"] + logging.debug(f"EEE: {endpoint}") + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps("client")) + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(endpoint)) + return conf + + +# Initialize Zenoh with default configuration +def get_zenoh_default_config(): + # Create a Zenoh configuration object with default settings + config = zenoh.Config() + + # # Set the mode to Peer (or Router, Client depending on your use case) + # config = "peer" + + return config diff --git a/up_client_zenoh/examples/publish.py b/up_client_zenoh/examples/publish.py new file mode 100644 index 0000000..5064d55 --- /dev/null +++ b/up_client_zenoh/examples/publish.py @@ -0,0 +1,47 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import time + +from uprotocol.proto.uattributes_pb2 import UPriority +from uprotocol.proto.umessage_pb2 import UMessage +from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload +from uprotocol.proto.uri_pb2 import UUri +from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder + +from up_client_zenoh.examples import common_uuri +from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \ + get_zenoh_default_config +from up_client_zenoh.upclientzenoh import UPClientZenoh + +publisher = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.PUBLISHER)) + + +def publishtoZenoh(): + # create uuri + uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) + cnt = 0 + while True: + data = f"{cnt}" + attributes = UAttributesBuilder.publish(uuri, UPriority.UPRIORITY_CS4).build() + payload = UPayload(value=data.encode('utf-8'), format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + umessage = UMessage(attributes=attributes, payload=payload) + common_uuri.logging.debug(f"Sending {data} to {uuri}...") + publisher.send(umessage) + time.sleep(3) + cnt += 1 + + +if __name__ == '__main__': + publishtoZenoh() diff --git a/up_client_zenoh/examples/rpc_client.py b/up_client_zenoh/examples/rpc_client.py new file mode 100644 index 0000000..a44bac5 --- /dev/null +++ b/up_client_zenoh/examples/rpc_client.py @@ -0,0 +1,46 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from uprotocol.proto.uattributes_pb2 import CallOptions +from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat +from uprotocol.proto.uri_pb2 import UUri + +from up_client_zenoh.examples import common_uuri +from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, get_zenoh_default_config +from up_client_zenoh.upclientzenoh import UPClientZenoh + +rpc_client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_CLIENT)) + + +def send_rpc_request_to_zenoh(): + # create uuri + uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource()) + # create UPayload + data = "GetCurrentTime" + payload = UPayload(length=0, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, value=bytes([ord(c) for c in data])) + # invoke RPC method + common_uuri.logging.debug(f"Send request to {uuri.entity}/{uuri.resource}") + response_future = rpc_client.invoke_method(uuri, payload, CallOptions(ttl=1000)) + # process the result + result = response_future.result() + if result and isinstance(result.payload.value, bytes): + data = list(result.payload.value) + value = ''.join(chr(c) for c in data) + common_uuri.logging.debug(f"Receive rpc response {value}") + else: + common_uuri.logging.debug("Failed to get result from invoke_method.") + + +if __name__ == '__main__': + send_rpc_request_to_zenoh() diff --git a/up_client_zenoh/examples/rpc_server.py b/up_client_zenoh/examples/rpc_server.py new file mode 100644 index 0000000..49400d5 --- /dev/null +++ b/up_client_zenoh/examples/rpc_server.py @@ -0,0 +1,56 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import time +from datetime import datetime + +from uprotocol.proto.umessage_pb2 import UMessage +from uprotocol.proto.upayload_pb2 import UPayload +from uprotocol.proto.upayload_pb2 import UPayloadFormat +from uprotocol.proto.uri_pb2 import UUri +from uprotocol.proto.ustatus_pb2 import UStatus +from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder +from uprotocol.transport.ulistener import UListener + +from up_client_zenoh.examples import common_uuri +from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, \ + get_zenoh_default_config +from up_client_zenoh.upclientzenoh import UPClientZenoh + +rpc_server = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_SERVER)) + + +class RPCRequestListener(UListener): + + def on_receive(self, msg: UMessage) -> UStatus: + attributes = msg.attributes + payload = msg.payload + value = ''.join(chr(c) for c in payload.value) + source = attributes.source + sink = attributes.sink + common_uuri.logging.debug(f"Receive {value} from {source} to {sink}") + response_payload = format(datetime.utcnow()).encode('utf-8') + payload = UPayload(value=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + attributes = UAttributesBuilder.response(msg.attributes).build() + rpc_server.send(UMessage(attributes=attributes, payload=payload)) + + +if __name__ == '__main__': + uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource()) + + common_uuri.logging.debug("Register the listener...") + rpc_server.register_listener(uuri, RPCRequestListener()) + + while True: + time.sleep(1) diff --git a/up_client_zenoh/examples/subscribe.py b/up_client_zenoh/examples/subscribe.py new file mode 100644 index 0000000..cab1b83 --- /dev/null +++ b/up_client_zenoh/examples/subscribe.py @@ -0,0 +1,49 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import time + +from uprotocol.proto.umessage_pb2 import UMessage +from uprotocol.proto.uri_pb2 import UUri +from uprotocol.proto.ustatus_pb2 import UStatus +from uprotocol.transport.ulistener import UListener + +from up_client_zenoh.examples import common_uuri +from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \ + get_zenoh_default_config +from up_client_zenoh.upclientzenoh import UPClientZenoh + + +class MyListener(UListener): + + def on_receive(self, msg: UMessage) -> UStatus: + common_uuri.logging.debug('on receive called') + common_uuri.logging.debug(msg.payload.value) + common_uuri.logging.debug(msg.attributes.__str__()) + return UStatus(message="Received event") + + +client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.SUBSCRIBER)) + + +def subscribeToZenoh(): + # create uuri + uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) + client.register_listener(uuri, MyListener()) + + +if __name__ == '__main__': + subscribeToZenoh() + while True: + time.sleep(1)