Skip to content

Commit

Permalink
Merge pull request #95 from Unity-Technologies/dev
Browse files Browse the repository at this point in the history
Version 0.6.0
  • Loading branch information
LaurieCheers-unity authored Oct 1, 2021
2 parents f978d94 + 5ea9b7b commit 715a402
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 67 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/stale.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: 'Stale issue handler'
on:
workflow_dispatch:
schedule:
- cron: '0 17 * * *' # 17:00 UTC; 10:00 PDT

permissions:
issues: write

jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v4.0.0
id: stale
with:
stale-issue-label: 'stale'
stale-issue-message: 'This issue has been marked stale because it has been open for 14 days with no activity. Please remove the stale label or comment on this issue, or the issue will be automatically closed in the next 14 days.'
days-before-stale: 14
days-before-pr-stale: -1
days-before-close: 14
days-before-pr-close: -1
exempt-issue-labels: 'blocked,must,should,keep,pinned,work-in-progress,request,announcement'
close-issue-message: 'This issue has been marked stale for 14 days and will now be closed. If this issue is still valid, please ping a maintainer.'
- name: Print outputs
run: echo ${{ join(steps.stale.outputs.*, ',') }}

2 changes: 1 addition & 1 deletion .yamato/yamato-config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Endpoint Unit Tests
agent:
type: Unity::VM
image: robotics/ci-ubuntu20:latest
image: robotics/ci-ubuntu20:v0.1.0-795910
flavor: i1.large
commands:
# run unit tests and save test results in /home/bokken/build/output/Unity-Technologies/ROS-TCP-Endpoint
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a

### Fixed

## [0.6.0] - 2021-09-30

Add the [Close Stale Issues](https://github.com/marketplace/actions/close-stale-issues) action

### Upgrade Notes

### Known Issues

### Added

Support for queue_size and latch for publishers. (https://github.com/Unity-Technologies/ROS-TCP-Endpoint/issues/82)

### Changed

### Deprecated

### Removed

### Fixed

## [0.5.0] - 2021-07-15

### Upgrade Notes
Expand Down Expand Up @@ -56,6 +76,8 @@ Add linter, unit tests, and test coverage reporting

### Changed

Improving the performance of the read_message in client.py, This is done by receiving the entire message all at once instead of reading 1024 byte chunks and stitching them together as you go.

### Deprecated

### Removed
Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="2">
<name>ros_tcp_endpoint</name>
<version>0.5.0</version>
<version>0.6.0</version>
<description>Acts as the bridge between Unity messages sent via Websocket and ROS messages.</description>

<maintainer email="unity-robotics@unity3d.com">Unity Robotics</maintainer>
Expand Down
38 changes: 16 additions & 22 deletions src/ros_tcp_endpoint/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def __init__(self, conn, tcp_server, incoming_ip, incoming_port):
Set class variables
Args:
conn:
source_destination_dict: dictionary of destination name to RosCommunicator class
tcp_server: server object
incoming_ip: connected from this IP address
incoming_port: connected from this port
"""
self.conn = conn
self.tcp_server = tcp_server
Expand Down Expand Up @@ -97,16 +99,7 @@ def read_message(conn):
destination = ClientThread.read_string(conn)
full_message_size = ClientThread.read_int32(conn)

while len(data) < full_message_size:
# Only grabs max of 1024 bytes TODO: change to TCPServer's buffer_size
grab = 1024 if full_message_size - len(data) > 1024 else full_message_size - len(data)
packet = ClientThread.recvall(conn, grab)

if not packet:
rospy.logerr("No packets...")
break

data += packet
data = ClientThread.recvall(conn, full_message_size)

if full_message_size > 0 and not data:
rospy.logerr("No data for a message size of {}, breaking!".format(full_message_size))
Expand Down Expand Up @@ -158,16 +151,16 @@ def serialize_command(command, params):
return cmd_info + json_info

def send_ros_service_request(self, srv_id, destination, data):
if destination not in self.tcp_server.source_destination_dict.keys():
error_msg = "Service destination '{}' is not registered! Known topics are: {} ".format(
destination, self.tcp_server.source_destination_dict.keys()
if destination not in self.tcp_server.ros_services.keys():
error_msg = "Service destination '{}' is not registered! Known services are: {} ".format(
destination, self.tcp_server.ros_services.keys()
)
self.tcp_server.send_unity_error(error_msg)
rospy.logerr(error_msg)
# TODO: send a response to Unity anyway?
return
else:
ros_communicator = self.tcp_server.source_destination_dict[destination]
ros_communicator = self.tcp_server.ros_services[destination]
service_thread = threading.Thread(
target=self.service_call_thread, args=(srv_id, destination, data, ros_communicator)
)
Expand All @@ -188,8 +181,8 @@ def service_call_thread(self, srv_id, destination, data, ros_communicator):

def run(self):
"""
Read a message and determine where to send it based on the source_destination_dict
and destination string. Then send the read message.
Receive a message from Unity and determine where to send it based on the publishers table
and topic string. Then send the read message.
If there is a response after sending the serialized data, assume it is a
ROS service response.
Expand All @@ -208,6 +201,7 @@ def run(self):
while not halt_event.is_set():
destination, data = self.read_message(self.conn)

# Process this message that was sent from Unity
if self.tcp_server.pending_srv_id is not None:
# if we've been told that the next message will be a service request/response, process it as such
if self.tcp_server.pending_srv_is_request:
Expand All @@ -225,12 +219,12 @@ def run(self):
elif destination.startswith("__"):
# handle a system command, such as registering new topics
self.tcp_server.handle_syscommand(destination, data)
elif destination in self.tcp_server.source_destination_dict:
ros_communicator = self.tcp_server.source_destination_dict[destination]
response = ros_communicator.send(data)
elif destination in self.tcp_server.publishers:
ros_communicator = self.tcp_server.publishers[destination]
ros_communicator.send(data)
else:
error_msg = "Topic '{}' is not registered! Known topics are: {} ".format(
destination, self.tcp_server.source_destination_dict.keys()
error_msg = "Not registered to publish topic '{}'! Valid publish topics are: {} ".format(
destination, self.tcp_server.publishers.keys()
)
self.tcp_server.send_unity_error(error_msg)
rospy.logerr(error_msg)
Expand Down
5 changes: 2 additions & 3 deletions src/ros_tcp_endpoint/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class RosPublisher(RosSender):
Class to publish messages to a ROS topic
"""

# TODO: surface latch functionality
def __init__(self, topic, message_class, queue_size=10):
def __init__(self, topic, message_class, queue_size=10, latch=False):
"""
Args:
Expand All @@ -33,7 +32,7 @@ def __init__(self, topic, message_class, queue_size=10):
"""
RosSender.__init__(self)
self.msg = message_class()
self.pub = rospy.Publisher(topic, message_class, queue_size=queue_size)
self.pub = rospy.Publisher(topic, message_class, queue_size=queue_size, latch=latch)

def send(self, data):
"""
Expand Down
45 changes: 22 additions & 23 deletions src/ros_tcp_endpoint/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,21 @@ def __init__(self, node_name, buffer_size=1024, connections=2, tcp_ip="", tcp_po
self.unity_tcp_sender = UnityTcpSender()

self.node_name = node_name
self.source_destination_dict = {}
self.publishers = {}
self.subscribers = {}
self.ros_services = {}
self.unity_services = {}
self.buffer_size = buffer_size
self.connections = connections
self.syscommands = SysCommands(self)
self.pending_srv_id = None
self.pending_srv_is_request = False

def start(self, source_destination_dict=None):
if source_destination_dict is not None:
self.source_destination_dict = source_destination_dict
def start(self, publishers=None, subscribers=None):
if publishers is not None:
self.publishers = publishers
if subscribers is not None:
self.subscribers = subscribers
server_thread = threading.Thread(target=self.listen_loop)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
Expand Down Expand Up @@ -134,14 +139,12 @@ def subscribe(self, topic, message_name):

rospy.loginfo("RegisterSubscriber({}, {}) OK".format(topic, message_class))

if topic in self.tcp_server.source_destination_dict:
self.tcp_server.source_destination_dict[topic].unregister()
if topic in self.tcp_server.subscribers:
self.tcp_server.subscribers[topic].unregister()

self.tcp_server.source_destination_dict[topic] = RosSubscriber(
topic, message_class, self.tcp_server
)
self.tcp_server.subscribers[topic] = RosSubscriber(topic, message_class, self.tcp_server)

def publish(self, topic, message_name):
def publish(self, topic, message_name, queue_size=10, latch=False):
if topic == "":
self.tcp_server.send_unity_error(
"Can't publish to a blank topic name! SysCommand.publish({}, {})".format(
Expand All @@ -159,12 +162,10 @@ def publish(self, topic, message_name):

rospy.loginfo("RegisterPublisher({}, {}) OK".format(topic, message_class))

if topic in self.tcp_server.source_destination_dict:
self.tcp_server.source_destination_dict[topic].unregister()
if topic in self.tcp_server.publishers:
self.tcp_server.publishers[topic].unregister()

self.tcp_server.source_destination_dict[topic] = RosPublisher(
topic, message_class, queue_size=10
)
self.tcp_server.publishers[topic] = RosPublisher(topic, message_class, queue_size, latch)

def ros_service(self, topic, message_name):
if topic == "":
Expand All @@ -186,10 +187,10 @@ def ros_service(self, topic, message_name):

rospy.loginfo("RegisterRosService({}, {}) OK".format(topic, message_class))

if topic in self.tcp_server.source_destination_dict:
self.tcp_server.source_destination_dict[topic].unregister()
if topic in self.tcp_server.ros_services:
self.tcp_server.ros_services[topic].unregister()

self.tcp_server.source_destination_dict[topic] = RosService(topic, message_class)
self.tcp_server.ros_services[topic] = RosService(topic, message_class)

def unity_service(self, topic, message_name):
if topic == "":
Expand All @@ -211,12 +212,10 @@ def unity_service(self, topic, message_name):

rospy.loginfo("RegisterUnityService({}, {}) OK".format(topic, message_class))

if topic in self.tcp_server.source_destination_dict:
self.tcp_server.source_destination_dict[topic].unregister()
if topic in self.tcp_server.unity_services:
self.tcp_server.unity_services[topic].unregister()

self.tcp_server.source_destination_dict[topic] = UnityService(
topic, message_class, self.tcp_server
)
self.tcp_server.unity_services[topic] = UnityService(topic, message_class, self.tcp_server)

def response(self, srv_id): # the next message is a service response
self.tcp_server.pending_srv_id = srv_id
Expand Down
Loading

0 comments on commit 715a402

Please sign in to comment.