Skip to content

Commit

Permalink
Add Zenoh publish/subscribe and RPC examples (#4)
Browse files Browse the repository at this point in the history
* Add Zenoh publish/subscribe and RPC examples

* Removed time sleep

* Add zenoh default config method
  • Loading branch information
neelam-kushwah committed May 29, 2024
1 parent 2668a54 commit ac112fe
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 0 deletions.
76 changes: 76 additions & 0 deletions up_client_zenoh/examples/common_uuri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.

This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at

http://www.apache.org/licenses/LICENSE-2.0

SPDX-License-Identifier: Apache-2.0
"""

import json
import logging
from enum import Enum

import zenoh
from uprotocol.proto.uri_pb2 import UAuthority
from uprotocol.proto.uri_pb2 import UEntity
from uprotocol.proto.uri_pb2 import UResource
from uprotocol.uri.factory.uresource_builder import UResourceBuilder

# Configure the logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')


class ExampleType(Enum):
PUBLISHER = "publisher"
SUBSCRIBER = "subscriber"
RPC_SERVER = "rpc_server"
RPC_CLIENT = "rpc_client"


def authority() -> UAuthority:
return UAuthority(name="auth_name", id=bytes([1, 2, 3, 4]))


def entity(example_type: ExampleType) -> UEntity:
mapping = {ExampleType.PUBLISHER : ("publisher", 1), ExampleType.SUBSCRIBER: ("subscriber", 2),
ExampleType.RPC_SERVER: ("rpc_server", 3), ExampleType.RPC_CLIENT: ("rpc_client", 4)}
name, id = mapping[example_type]
return UEntity(name=name, id=1, version_major=id)


def pub_resource() -> UResource:
return UResource(name="door", instance="front_left", message="Door", id=5678)


def rpc_resource() -> UResource:
return UResourceBuilder.for_rpc_request("getTime", 5678)


def get_zenoh_config():
# start your zenoh router and provide router ip and port
zenoh_ip = "192.168.29.79" # zenoh router ip
zenoh_port = 9090 # zenoh router port
conf = zenoh.Config()
if zenoh_ip is not None:
endpoint = [f"tcp/{zenoh_ip}:{zenoh_port}"]
logging.debug(f"EEE: {endpoint}")
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps("client"))
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(endpoint))
return conf


# Initialize Zenoh with default configuration
def get_zenoh_default_config():
# Create a Zenoh configuration object with default settings
config = zenoh.Config()

# # Set the mode to Peer (or Router, Client depending on your use case)
# config = "peer"

return config
47 changes: 47 additions & 0 deletions up_client_zenoh/examples/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.

This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at

http://www.apache.org/licenses/LICENSE-2.0

SPDX-License-Identifier: Apache-2.0
"""

import time

from uprotocol.proto.uattributes_pb2 import UPriority
from uprotocol.proto.umessage_pb2 import UMessage
from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload
from uprotocol.proto.uri_pb2 import UUri
from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder

from up_client_zenoh.examples import common_uuri
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \
get_zenoh_default_config
from up_client_zenoh.upclientzenoh import UPClientZenoh

publisher = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.PUBLISHER))


def publishtoZenoh():
# create uuri
uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource())
cnt = 0
while True:
data = f"{cnt}"
attributes = UAttributesBuilder.publish(uuri, UPriority.UPRIORITY_CS4).build()
payload = UPayload(value=data.encode('utf-8'), format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT)
umessage = UMessage(attributes=attributes, payload=payload)
common_uuri.logging.debug(f"Sending {data} to {uuri}...")
publisher.send(umessage)
time.sleep(3)
cnt += 1


if __name__ == '__main__':
publishtoZenoh()
46 changes: 46 additions & 0 deletions up_client_zenoh/examples/rpc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.

This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at

http://www.apache.org/licenses/LICENSE-2.0

SPDX-License-Identifier: Apache-2.0
"""

from uprotocol.proto.uattributes_pb2 import CallOptions
from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat
from uprotocol.proto.uri_pb2 import UUri

from up_client_zenoh.examples import common_uuri
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, get_zenoh_default_config
from up_client_zenoh.upclientzenoh import UPClientZenoh

rpc_client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_CLIENT))


def send_rpc_request_to_zenoh():
# create uuri
uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource())
# create UPayload
data = "GetCurrentTime"
payload = UPayload(length=0, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, value=bytes([ord(c) for c in data]))
# invoke RPC method
common_uuri.logging.debug(f"Send request to {uuri.entity}/{uuri.resource}")
response_future = rpc_client.invoke_method(uuri, payload, CallOptions(ttl=1000))
# process the result
result = response_future.result()
if result and isinstance(result.payload.value, bytes):
data = list(result.payload.value)
value = ''.join(chr(c) for c in data)
common_uuri.logging.debug(f"Receive rpc response {value}")
else:
common_uuri.logging.debug("Failed to get result from invoke_method.")


if __name__ == '__main__':
send_rpc_request_to_zenoh()
56 changes: 56 additions & 0 deletions up_client_zenoh/examples/rpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.

This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at

http://www.apache.org/licenses/LICENSE-2.0

SPDX-License-Identifier: Apache-2.0
"""

import time
from datetime import datetime

from uprotocol.proto.umessage_pb2 import UMessage
from uprotocol.proto.upayload_pb2 import UPayload
from uprotocol.proto.upayload_pb2 import UPayloadFormat
from uprotocol.proto.uri_pb2 import UUri
from uprotocol.proto.ustatus_pb2 import UStatus
from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder
from uprotocol.transport.ulistener import UListener

from up_client_zenoh.examples import common_uuri
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, \
get_zenoh_default_config
from up_client_zenoh.upclientzenoh import UPClientZenoh

rpc_server = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_SERVER))


class RPCRequestListener(UListener):

def on_receive(self, msg: UMessage) -> UStatus:
attributes = msg.attributes
payload = msg.payload
value = ''.join(chr(c) for c in payload.value)
source = attributes.source
sink = attributes.sink
common_uuri.logging.debug(f"Receive {value} from {source} to {sink}")
response_payload = format(datetime.utcnow()).encode('utf-8')
payload = UPayload(value=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT)
attributes = UAttributesBuilder.response(msg.attributes).build()
rpc_server.send(UMessage(attributes=attributes, payload=payload))


if __name__ == '__main__':
uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource())

common_uuri.logging.debug("Register the listener...")
rpc_server.register_listener(uuri, RPCRequestListener())

while True:
time.sleep(1)
49 changes: 49 additions & 0 deletions up_client_zenoh/examples/subscribe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.

This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at

http://www.apache.org/licenses/LICENSE-2.0

SPDX-License-Identifier: Apache-2.0
"""

import time

from uprotocol.proto.umessage_pb2 import UMessage
from uprotocol.proto.uri_pb2 import UUri
from uprotocol.proto.ustatus_pb2 import UStatus
from uprotocol.transport.ulistener import UListener

from up_client_zenoh.examples import common_uuri
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \
get_zenoh_default_config
from up_client_zenoh.upclientzenoh import UPClientZenoh


class MyListener(UListener):

def on_receive(self, msg: UMessage) -> UStatus:
common_uuri.logging.debug('on receive called')
common_uuri.logging.debug(msg.payload.value)
common_uuri.logging.debug(msg.attributes.__str__())
return UStatus(message="Received event")


client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.SUBSCRIBER))


def subscribeToZenoh():
# create uuri
uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource())
client.register_listener(uuri, MyListener())


if __name__ == '__main__':
subscribeToZenoh()
while True:
time.sleep(1)

0 comments on commit ac112fe

Please sign in to comment.