Skip to content

Commit

Permalink
Add test_ros2trace_analysis pkg with 'ros2 trace-analysis process' tests
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <christophe.bedard@apex.ai>
  • Loading branch information
christophebedard committed Jun 14, 2024
1 parent 943bf20 commit 3f41bb2
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 0 deletions.
4 changes: 4 additions & 0 deletions test_ros2trace_analysis/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[run]
omit =
setup.py
test/*
3 changes: 3 additions & 0 deletions test_ros2trace_analysis/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*~
*.pyc

Empty file.
32 changes: 32 additions & 0 deletions test_ros2trace_analysis/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>test_ros2trace_analysis</name>
<version>8.3.0</version>
<description>Tests for the ros2trace_analysis package.</description>
<maintainer email="bedard.christophe@gmail.com">Christophe Bedard</maintainer>
<license>Apache 2.0</license>
<url type="website">https://index.ros.org/p/test_ros2trace_analysis/</url>
<url type="repository">https://github.com/ros-tracing/tracetools_analysis</url>
<url type="bugtracker">https://github.com/ros-tracing/tracetools_analysis/issues</url>
<author email="bedard.christophe@gmail.com">Christophe Bedard</author>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_mypy</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>ros2run</test_depend>
<test_depend>ros2trace</test_depend>
<test_depend>ros2trace_analysis</test_depend>
<test_depend>test_tracetools</test_depend>
<test_depend>tracetools</test_depend>
<test_depend>tracetools_trace</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
26 changes: 26 additions & 0 deletions test_ros2trace_analysis/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from setuptools import find_packages
from setuptools import setup

package_name = 'test_ros2trace_analysis'

setup(
name=package_name,
version='3.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Christophe Bedard',
maintainer_email='bedard.christophe@gmail.com',
author='Christophe Bedard',
author_email='bedard.christophe@gmail.com',
url='https://github.com/ros-tracing/tracetools_analysis',
keywords=[],
description='Tests for the ros2trace_analysis package.',
license='Apache 2.0',
tests_require=['pytest'],
)
23 changes: 23 additions & 0 deletions test_ros2trace_analysis/test/test_copyright.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_copyright.main import main
import pytest


@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'
25 changes: 25 additions & 0 deletions test_ros2trace_analysis/test/test_flake8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_flake8.main import main_with_errors
import pytest


@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)
22 changes: 22 additions & 0 deletions test_ros2trace_analysis/test/test_mypy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2019 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_mypy.main import main
import pytest


@pytest.mark.mypy
@pytest.mark.linter
def test_mypy():
assert main(argv=[]) == 0, 'Found errors'
23 changes: 23 additions & 0 deletions test_ros2trace_analysis/test/test_pep257.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_pep257.main import main
import pytest


@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=[])
assert rc == 0, 'Found code style errors / warnings'
175 changes: 175 additions & 0 deletions test_ros2trace_analysis/test/test_ros2trace_analysis/test_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright 2024 Apex.AI, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from pathlib import Path
import shutil
import subprocess
import tempfile
from typing import Dict
from typing import List
from typing import Optional
import unittest

from launch import LaunchDescription
from launch import LaunchService
from launch_ros.actions import Node
from tracetools_trace.tools.lttng import is_lttng_installed


def are_tracepoints_included() -> bool:
"""
Check if tracing instrumentation is enabled and if tracepoints are included.
:return: True if tracepoints are included, False otherwise
"""
if not is_lttng_installed():
return False
process = subprocess.run(
['ros2', 'run', 'tracetools', 'status'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
)
return 0 == process.returncode


@unittest.skipIf(not is_lttng_installed(minimum_version='2.9.0'), 'LTTng is required')
class TestROS2TraceAnalysisCLI(unittest.TestCase):

def __init__(self, *args) -> None:
super().__init__(
*args,
)

def create_test_tmpdir(self, test_name: str) -> str:
prefix = self.__class__.__name__ + '__' + test_name
return tempfile.mkdtemp(prefix=prefix)

def run_command(
self,
args: List[str],
*,
env: Optional[Dict[str, str]] = None,
) -> subprocess.Popen:
print('=>running:', args)
process_env = os.environ.copy()
process_env['PYTHONUNBUFFERED'] = '1'
if env:
process_env.update(env)
return subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
env=process_env,
)

def wait_and_print_command_output(
self,
process: subprocess.Popen,
) -> int:
stdout, stderr = process.communicate()
stdout = stdout.strip(' \r\n\t')
stderr = stderr.strip(' \r\n\t')
print('=>stdout:\n' + stdout)
print('=>stderr:\n' + stderr)
return process.wait()

def run_command_and_wait(
self,
args: List[str],
*,
env: Optional[Dict[str, str]] = None,
) -> int:
process = self.run_command(args, env=env)
return self.wait_and_print_command_output(process)

def run_nodes(self) -> None:
nodes = [
Node(
package='test_tracetools',
executable='test_ping',
output='screen',
),
Node(
package='test_tracetools',
executable='test_pong',
output='screen',
),
]
ld = LaunchDescription(nodes)
ls = LaunchService()
ls.include_launch_description(ld)
exit_code = ls.run()
self.assertEqual(0, exit_code)

def test_process_bad_input_path(self) -> None:
tmpdir = self.create_test_tmpdir('test_process_bad_input_path')

# No input path
ret = self.run_command_and_wait(['ros2', 'trace-analysis', 'process'])
self.assertEqual(2, ret)

# Does not exist
ret = self.run_command_and_wait(['ros2', 'trace-analysis', 'process', ''])
self.assertEqual(1, ret)
fake_input = os.path.join(tmpdir, 'doesnt_exist')
ret = self.run_command_and_wait(['ros2', 'trace-analysis', 'process', fake_input])
self.assertEqual(1, ret)

# Exists but empty
empty_input = os.path.join(tmpdir, 'empty')
os.mkdir(empty_input)
ret = self.run_command_and_wait(['ros2', 'trace-analysis', 'process', empty_input])
self.assertEqual(1, ret)

# Exists but converted file empty
empty_converted_file = os.path.join(empty_input, 'converted')
Path(empty_converted_file).touch()
ret = self.run_command_and_wait(['ros2', 'trace-analysis', 'process', empty_input])
self.assertEqual(1, ret)

shutil.rmtree(tmpdir)

@unittest.skipIf(not are_tracepoints_included(), 'tracepoints are required')
def test_process(self) -> None:
tmpdir = self.create_test_tmpdir('test_process')
session_name = 'test_process'

# Run and trace nodes
ret = self.run_command_and_wait(
[
'ros2', 'trace',
'start', session_name,
'--path', tmpdir,
],
)
self.assertEqual(0, ret)
trace_dir = os.path.join(tmpdir, session_name)
self.run_nodes()
ret = self.run_command_and_wait(['ros2', 'trace', 'stop', session_name])
self.assertEqual(0, ret)

# Process trace
ret = self.run_command_and_wait(['ros2', 'trace-analysis', 'process', trace_dir])
self.assertEqual(0, ret)

# Check that converted file exists and isn't empty
converted_file = os.path.join(trace_dir, 'converted')
self.assertTrue(os.path.isfile(converted_file))
self.assertGreater(os.path.getsize(converted_file), 0)

shutil.rmtree(tmpdir)
23 changes: 23 additions & 0 deletions test_ros2trace_analysis/test/test_xmllint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_xmllint.main import main
import pytest


@pytest.mark.linter
@pytest.mark.xmllint
def test_xmllint():
rc = main(argv=[])
assert rc == 0, 'Found errors'
Empty file.

0 comments on commit 3f41bb2

Please sign in to comment.