From 21cec63efb4f8e1c4248b4f2f1a0e25d45af4c1d Mon Sep 17 00:00:00 2001 From: Alex Campos Date: Mon, 6 Mar 2023 13:21:35 -0800 Subject: [PATCH 1/8] Add request-reply example for Python --- .../request_reply/cs/USER_QOS_PROFILES.xml | 17 +- .../connext_dds/request_reply/py/Primes.idl | 71 +++++++++ .../connext_dds/request_reply/py/Primes.py | 35 +++++ .../connext_dds/request_reply/py/README.md | 60 ++++++++ .../request_reply/py/USER_QOS_PROFILES.xml | 145 ++++++++++++++++++ .../request_reply/py/primes_replier.py | 128 ++++++++++++++++ .../request_reply/py/primes_requester.py | 113 ++++++++++++++ .../request_reply/py/primes_simple_replier.py | 121 +++++++++++++++ 8 files changed, 680 insertions(+), 10 deletions(-) create mode 100644 examples/connext_dds/request_reply/py/Primes.idl create mode 100644 examples/connext_dds/request_reply/py/Primes.py create mode 100644 examples/connext_dds/request_reply/py/README.md create mode 100644 examples/connext_dds/request_reply/py/USER_QOS_PROFILES.xml create mode 100644 examples/connext_dds/request_reply/py/primes_replier.py create mode 100644 examples/connext_dds/request_reply/py/primes_requester.py create mode 100644 examples/connext_dds/request_reply/py/primes_simple_replier.py diff --git a/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml b/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml index 78a6757f5..c2b2eb264 100644 --- a/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml +++ b/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml @@ -1,9 +1,8 @@ - + - + 100000000 0 - 10000000 + 10000000 0 - 10000000 + 10000000 0 @@ -63,7 +62,7 @@ LENGTH_UNLIMITED @@ -99,13 +98,11 @@ - - - diff --git a/examples/connext_dds/request_reply/py/Primes.idl b/examples/connext_dds/request_reply/py/Primes.idl new file mode 100644 index 000000000..ebf07b890 --- /dev/null +++ b/examples/connext_dds/request_reply/py/Primes.idl @@ -0,0 +1,71 @@ +/*****************************************************************************/ +/* (c) Copyright, Real-Time Innovations, All rights reserved. */ +/* */ +/* Permission to modify and use for internal purposes granted. */ +/* This software is provided "as is", without warranty, express or implied. */ +/* */ +/*****************************************************************************/ + +/* --- Request type: ------------------------------------------------------- */ + +/* + * This is the request type. + * + * It simply contains an integer that requests all the prime numbers below it. + * + */ +struct PrimeNumberRequest { + /* + * Requests the calculation of the prime numbers below n + */ + int32 n; + + /* + * How many prime numbers should be included in each reply sample. + * + * A smaller value means that results are received more frequently + * (and hence more data samples are sent) + */ + int32 primes_per_reply; +}; + +/* --- Reply type: --------------------------------------------------------- */ + +/* + * Indicates the status of one of the multiple possible replies for a request + */ +enum PrimeNumberCalculationStatus { + /* + * Indicates that this reply contains a new sequence of + * prime numbers for a request, but there are still more to come + */ + REPLY_IN_PROGRESS, + /* + * Indicates that this is the last sequence of + * prime numbers for a request. + */ + REPLY_COMPLETED, + /* + * Indicates that there was an error. After an error + * there won't be any more replies for a request + */ + REPLY_ERROR +}; + +/* + * This is the reply type. + * + * The replier sends sequences of prime numbers as they are being calculated. + * + */ +struct PrimeNumberReply { + /* + * A sequence of prime numbers + */ + sequence primes; + + /* + * Status information about this reply + */ + PrimeNumberCalculationStatus status; +}; diff --git a/examples/connext_dds/request_reply/py/Primes.py b/examples/connext_dds/request_reply/py/Primes.py new file mode 100644 index 000000000..fbfd98959 --- /dev/null +++ b/examples/connext_dds/request_reply/py/Primes.py @@ -0,0 +1,35 @@ +# +# (c) 2023 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software solely for use with RTI products. The Software is +# provided "as is", with no warranty of any type, including any warranty for +# fitness for any purpose. RTI is under no obligation to maintain or support +# the Software. RTI shall not be liable for any incidental or consequential +# damages arising out of the use or inability to use the software. +# + +# This file was generated by RTI Code Generator with the following command: +# rtiddsgen -language python -unboundedSupport Primes.idl + +from dataclasses import field +from typing import Union, Sequence, Optional +import rti.idl as idl +from enum import IntEnum + + +@idl.struct +class PrimeNumberRequest: + n: idl.int32 = 0 + primes_per_reply: idl.int32 = 0 + +@idl.enum +class PrimeNumberCalculationStatus(IntEnum): + REPLY_IN_PROGRESS = 0 + REPLY_COMPLETED = 1 + REPLY_ERROR = 2 + +@idl.struct +class PrimeNumberReply: + primes: Sequence[idl.int32] = field(default_factory = idl.array_factory(idl.int32)) + status: PrimeNumberCalculationStatus = PrimeNumberCalculationStatus.REPLY_IN_PROGRESS diff --git a/examples/connext_dds/request_reply/py/README.md b/examples/connext_dds/request_reply/py/README.md new file mode 100644 index 000000000..f087dad8a --- /dev/null +++ b/examples/connext_dds/request_reply/py/README.md @@ -0,0 +1,60 @@ +# Example Code: Request-Reply + +If you haven't used the RTI Connext Python API before, first check the +[Getting Started Guide](https://community.rti.com/static/documentation/connext-dds/7.0.0/doc/manuals/connext_dds_professional/getting_started_guide/index.html). + +## Running the Example + +In two separate command prompt windows for the requester and replier run the +following commands from the example directory (this is necessary to ensure the +application loads the QoS defined in *USER_QOS_PROFILES.xml*): + +```sh +python primes_replier.py +python primes_requester.py +``` + +Where ```` is the number to calculate the primes below. + +Use ``-h`` to see the full list of arguments. + +## Replier Output + +The replier will print a message every time it receives a request, for example: + +```plaintext +Calculating prime numbers below 100... +DONE +``` + +## Requester Output + +The requester prints the prime numbers sent by the replier, as they are received. + +For example: + +```sh +$ python primes_requester.py 100 +Sending a request to calculate the prime numbers <= 100 in sequences of 5 or fewer elements +[2, 3, 5, 7, 11] +[13, 17, 19, 23, 29] +[31, 37, 41, 43, 47] +[53, 59, 61, 67, 71] +[73, 79, 83, 89, 97] +DONE +``` + +Each line of numbers is an individual reply. You can set `--primes-per-reply` +to control this behavior in the example (by default it's 5). + +```sh +$ python primes_requester.py 100 --primes-per-reply 10 +Running PrimesRequester on domain 0 +Sending a request to calculate the prime numbers <= 100 in sequences of 10 or fewer elements +[2, 3, 5, 7, 11, 13, 17, 19, 23, 29] +[31, 37, 41, 43, 47, 53, 59, 61, 67, 71] +[73, 79, 83, 89, 97] +DONE +``` + +This demonstrates how a single request can be served by multiple replies. diff --git a/examples/connext_dds/request_reply/py/USER_QOS_PROFILES.xml b/examples/connext_dds/request_reply/py/USER_QOS_PROFILES.xml new file mode 100644 index 000000000..c2b2eb264 --- /dev/null +++ b/examples/connext_dds/request_reply/py/USER_QOS_PROFILES.xml @@ -0,0 +1,145 @@ + + + + + + + + + + + + RELIABLE_RELIABILITY_QOS + + 10 + 0 + + + + + KEEP_ALL_HISTORY_QOS + + + + + + + LENGTH_UNLIMITED + + + 2 + + + 0 + 100000000 + + + 0 + 10000000 + + + 0 + 10000000 + + + 0 + 0 + + + 0 + 0 + + 32 + 32 + + + + + + + LENGTH_UNLIMITED + + + + + + + + RELIABLE_RELIABILITY_QOS + + 10 + 0 + + + + + KEEP_ALL_HISTORY_QOS + + + + + + + 0 + 0 + + + 0 + 0 + + + + + + + + + + + + + TRANSIENT_LOCAL_DURABILITY_QOS + + + + + + + VOLATILE_DURABILITY_QOS + + + + + + + + + + + VOLATILE_DURABILITY_QOS + + + + + + + TRANSIENT_LOCAL_DURABILITY_QOS + + + + + + diff --git a/examples/connext_dds/request_reply/py/primes_replier.py b/examples/connext_dds/request_reply/py/primes_replier.py new file mode 100644 index 000000000..d74258658 --- /dev/null +++ b/examples/connext_dds/request_reply/py/primes_replier.py @@ -0,0 +1,128 @@ +# +# (c) 2023 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software solely for use with RTI products. The Software is +# provided "as is", with no warranty of any type, including any warranty for +# fitness for any purpose. RTI is under no obligation to maintain or support +# the Software. RTI shall not be liable for any incidental or consequential +# damages arising out of the use or inability to use the software. +# + +import argparse +import math +from typing import Sequence, Tuple + +import rti.connextdds as dds +from rti.request import Replier +import Primes + + +def is_prime(val): + if val <= 1: + return False + if val == 2: + return True + if val > 2 and val % 2 == 0: + return False + + max_div = int(math.floor(math.sqrt(val))) + for i in range(3, 1 + max_div, 2): + if val % i == 0: + return False + return True + + +def replier_main(domain_id): + # Get the QoS from a profile in USER_QOS_PROFILES.xml (the default + # QosProvider will load the USER_QOS_PROFILES.xml file from the current + # working directory) + qos_provider = dds.QosProvider.default + writer_qos = qos_provider.datawriter_qos_from_profile( + "RequestReplyExampleProfiles::ReplierExampleProfile" + ) + reader_qos = qos_provider.datareader_qos_from_profile( + "RequestReplyExampleProfiles::ReplierExampleProfile" + ) + + participant = dds.DomainParticipant(domain_id) + + replier = Replier( + request_type=Primes.PrimeNumberRequest, + reply_type=Primes.PrimeNumberReply, + participant=participant, + service_name="PrimeCalculator", + datawriter_qos=writer_qos, + datareader_qos=reader_qos, + ) + + print( + f"Prime calculation replier started on domain {domain_id}" + ) + + max_wait = dds.Duration.from_seconds(20) + requests: Sequence[ + Tuple[Primes.PrimeNumberRequest, dds.SampleInfo] + ] = replier.receive_requests(max_wait) + + while len(requests) > 0: + for request, request_info in requests: + if not request_info.valid: + continue + + if request.n <= 0 or request.primes_per_reply <= 0: + error_reply = Primes.PrimeNumberReply( + status=Primes.PrimeNumberCalculationStatus.REPLY_ERROR + ) + replier.send_reply(error_reply, request_info) + else: + n = request.n + print(f"Calculating prime numbers below {n}...") + + max_count = request.primes_per_reply + primes = dds.Int32Seq() + + reply = Primes.PrimeNumberReply() + for m in range(1, n + 1): + if is_prime(m): + primes.append(m) + if len(primes) == max_count: + reply.primes = primes + reply.status = ( + Primes.PrimeNumberCalculationStatus.REPLY_IN_PROGRESS + ) + replier.send_reply( + reply, request_info, final=False + ) + primes.clear() + + reply.primes = primes + reply.status = ( + Primes.PrimeNumberCalculationStatus.REPLY_COMPLETED + ) + replier.send_reply(reply, request_info) + + print("DONE") + + requests = replier.receive_requests(max_wait) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="RTI Connext DDS Example: Request-Reply (Replier)" + ) + parser.add_argument( + "-d", + "--domain", + type=int, + default=0, + help="DDS Domain ID (default: 0)", + ) + + args = parser.parse_args() + assert 0 <= args.domain < 233 + + try: + replier_main(args.domain) + except dds.TimeoutError: + print("Timeout: no requests received") diff --git a/examples/connext_dds/request_reply/py/primes_requester.py b/examples/connext_dds/request_reply/py/primes_requester.py new file mode 100644 index 000000000..8278cf8ac --- /dev/null +++ b/examples/connext_dds/request_reply/py/primes_requester.py @@ -0,0 +1,113 @@ +# +# (c) 2023 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software solely for use with RTI products. The Software is +# provided "as is", with no warranty of any type, including any warranty for +# fitness for any purpose. RTI is under no obligation to maintain or support +# the Software. RTI shall not be liable for any incidental or consequential +# damages arising out of the use or inability to use the software. +# + +import argparse +import time +from typing import Sequence + +import rti.connextdds as dds +from rti.request import Requester +import Primes + + +def requester_main(domain_id, n, primes_per_reply): + # Get the QoS from a profile in USER_QOS_PROFILES.xml (the default + # QosProvider will load the USER_QOS_PROFILES.xml file from the current + # working directory) + qos_provider = dds.QosProvider.default + writer_qos = qos_provider.datawriter_qos_from_profile( + "RequestReplyExampleProfiles::RequesterExampleProfile" + ) + reader_qos = qos_provider.datareader_qos_from_profile( + "RequestReplyExampleProfiles::RequesterExampleProfile" + ) + + participant = dds.DomainParticipant(domain_id) + + requester = Requester( + request_type=Primes.PrimeNumberRequest, + reply_type=Primes.PrimeNumberReply, + participant=participant, + service_name="PrimeCalculator", + datawriter_qos=writer_qos, + datareader_qos=reader_qos, + ) + + print(f"Waiting to discover replier on domain {domain_id}") + + while requester.matched_replier_count == 0: + time.sleep(0.1) + + prime_number_request = Primes.PrimeNumberRequest( + n=n, primes_per_reply=primes_per_reply + ) + + print( + f"Sending a request to calculate the prime numbers <= {n} in sequences of {primes_per_reply} or fewer elements" + ) + + request_id = requester.send_request(prime_number_request) + + max_wait = dds.Duration.from_seconds(20) + in_progress = True + while in_progress: + if not requester.wait_for_replies( + max_wait, related_request_id=request_id + ): + raise dds.TimeoutError("Timed out waiting for replies") + + replies: Sequence[ + Primes.PrimeNumberReply + ] = requester.reply_datareader.take_data() + for reply in replies: + print(list(reply.primes)) + + if ( + reply.status + != Primes.PrimeNumberCalculationStatus.REPLY_IN_PROGRESS + ): + in_progress = False + if ( + reply.status + == Primes.PrimeNumberCalculationStatus.REPLY_ERROR + ): + raise RuntimeError("Error in Replier") + + print("DONE") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="RTI Connext DDS Example: Request-Reply (Requester)" + ) + parser.add_argument( + "-d", + "--domain", + type=int, + default=0, + help="DDS Domain ID (default: 0)", + ) + parser.add_argument( + "-p", + "--primes-per-reply", + type=int, + default=5, + help="Number of primes per reply (min: 5, default: 5)", + ) + parser.add_argument( + "n", type=int, help="Request will retrieve prime numbers <= n" + ) + + args = parser.parse_args() + assert 0 <= args.domain < 233 + assert args.primes_per_reply >= 5 + + requester_main(args.domain, args.n, args.primes_per_reply) diff --git a/examples/connext_dds/request_reply/py/primes_simple_replier.py b/examples/connext_dds/request_reply/py/primes_simple_replier.py new file mode 100644 index 000000000..13c83c689 --- /dev/null +++ b/examples/connext_dds/request_reply/py/primes_simple_replier.py @@ -0,0 +1,121 @@ +# +# (c) 2023 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software solely for use with RTI products. The Software is +# provided "as is", with no warranty of any type, including any warranty for +# fitness for any purpose. RTI is under no obligation to maintain or support +# the Software. RTI shall not be liable for any incidental or consequential +# damages arising out of the use or inability to use the software. +# + +import argparse +import time +import math + +import rti.connextdds as dds +from rti.request import SimpleReplier +import Primes + +request_serviced = True + + +def is_prime(val): + if val <= 1: + return False + if val == 2: + return True + if val > 2 and val % 2 == 0: + return False + + max_div = math.floor(math.sqrt(val)) + for i in range(3, 1 + max_div, 2): + if val % i == 0: + return False + return True + + +request_serviced = True + +def request_handler(request: Primes.PrimeNumberRequest): + global request_serviced + request_serviced = True + if request.n <= 0 or request.primes_per_reply <= 0: + error_reply = Primes.PrimeNumberReply( + status=Primes.PrimeNumberCalculationStatus.REPLY_ERROR + ) + return error_reply + else: + n = request.n + print(f"Calculating prime numbers below {n}...") + + max_count = request.primes_per_reply + primes = dds.Int32Seq() + + reply = Primes.PrimeNumberReply() + for m in range(1, n + 1): + if is_prime(m): + primes.append(m) + if len(primes) > max_count: + reply.status = ( + Primes.PrimeNumberCalculationStatus.REPLY_ERROR + ) + print("Error: too many primes for a single reply") + return reply + + reply.primes = primes + reply.status = Primes.PrimeNumberCalculationStatus.REPLY_COMPLETED + print("DONE") + + return reply + + +def replier_main(domain_id): + # Get the QoS from a profile in USER_QOS_PROFILES.xml (the default + # QosProvider will load the USER_QOS_PROFILES.xml file from the current + # working directory) + qos_provider = dds.QosProvider.default + writer_qos = qos_provider.datawriter_qos_from_profile( + "RequestReplyExampleProfiles::ReplierExampleProfile" + ) + reader_qos = qos_provider.datareader_qos_from_profile( + "RequestReplyExampleProfiles::ReplierExampleProfile" + ) + + participant = dds.DomainParticipant(domain_id) + + replier = SimpleReplier( + request_type=Primes.PrimeNumberRequest, + reply_type=Primes.PrimeNumberReply, + participant=participant, + service_name="PrimeCalculator", + handler=request_handler, + datawriter_qos=writer_qos, + datareader_qos=reader_qos, + ) + print(f"Prime calculation replier started on domain {domain_id}...") + + global request_serviced + while request_serviced: + request_serviced = False + time.sleep(20) + + print("Timed out waiting for requests") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="RTI Connext DDS Example: Request-Reply (Simple Replier)" + ) + parser.add_argument( + "-d", + "--domain", + type=int, + default=0, + help="DDS Domain ID (default: 0)", + ) + + args = parser.parse_args() + assert 0 <= args.domain < 233 + + replier_main(args.domain) From ec77c26fe5da3e60ada781b48a8d5e55838693e9 Mon Sep 17 00:00:00 2001 From: Alex Campos Date: Mon, 6 Mar 2023 13:24:14 -0800 Subject: [PATCH 2/8] Fix formatting (black -l 79) --- examples/connext_dds/request_reply/py/Primes.py | 10 ++++++++-- .../connext_dds/request_reply/py/primes_replier.py | 4 +--- .../request_reply/py/primes_simple_replier.py | 1 + 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/examples/connext_dds/request_reply/py/Primes.py b/examples/connext_dds/request_reply/py/Primes.py index fbfd98959..676638bf5 100644 --- a/examples/connext_dds/request_reply/py/Primes.py +++ b/examples/connext_dds/request_reply/py/Primes.py @@ -23,13 +23,19 @@ class PrimeNumberRequest: n: idl.int32 = 0 primes_per_reply: idl.int32 = 0 + @idl.enum class PrimeNumberCalculationStatus(IntEnum): REPLY_IN_PROGRESS = 0 REPLY_COMPLETED = 1 REPLY_ERROR = 2 + @idl.struct class PrimeNumberReply: - primes: Sequence[idl.int32] = field(default_factory = idl.array_factory(idl.int32)) - status: PrimeNumberCalculationStatus = PrimeNumberCalculationStatus.REPLY_IN_PROGRESS + primes: Sequence[idl.int32] = field( + default_factory=idl.array_factory(idl.int32) + ) + status: PrimeNumberCalculationStatus = ( + PrimeNumberCalculationStatus.REPLY_IN_PROGRESS + ) diff --git a/examples/connext_dds/request_reply/py/primes_replier.py b/examples/connext_dds/request_reply/py/primes_replier.py index d74258658..44ad70e70 100644 --- a/examples/connext_dds/request_reply/py/primes_replier.py +++ b/examples/connext_dds/request_reply/py/primes_replier.py @@ -56,9 +56,7 @@ def replier_main(domain_id): datareader_qos=reader_qos, ) - print( - f"Prime calculation replier started on domain {domain_id}" - ) + print(f"Prime calculation replier started on domain {domain_id}") max_wait = dds.Duration.from_seconds(20) requests: Sequence[ diff --git a/examples/connext_dds/request_reply/py/primes_simple_replier.py b/examples/connext_dds/request_reply/py/primes_simple_replier.py index 13c83c689..9fd05d629 100644 --- a/examples/connext_dds/request_reply/py/primes_simple_replier.py +++ b/examples/connext_dds/request_reply/py/primes_simple_replier.py @@ -37,6 +37,7 @@ def is_prime(val): request_serviced = True + def request_handler(request: Primes.PrimeNumberRequest): global request_serviced request_serviced = True From bb07d0af87f0a1e61ae987c73b2bfc40fe653476 Mon Sep 17 00:00:00 2001 From: Alex Campos Date: Mon, 6 Mar 2023 13:26:50 -0800 Subject: [PATCH 3/8] Update link to xsd --- examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml | 2 +- examples/connext_dds/request_reply/py/USER_QOS_PROFILES.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml b/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml index c2b2eb264..8366b6aa7 100644 --- a/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml +++ b/examples/connext_dds/request_reply/cs/USER_QOS_PROFILES.xml @@ -1,6 +1,6 @@ - +