diff --git a/providers/base/bin/reboot_check_test.py b/providers/base/bin/reboot_check_test.py index ca4685fee7..f4df73caba 100755 --- a/providers/base/bin/reboot_check_test.py +++ b/providers/base/bin/reboot_check_test.py @@ -10,6 +10,7 @@ import typing as T from checkbox_support.scripts.image_checker import has_desktop_environment from datetime import datetime +import time # Checkbox could run in a snap container, so we need to prepend this root path @@ -50,11 +51,17 @@ class Device: # to modify, add more values in the enum # and reference them in required/optional respectively + COMMAND_TIMEOUT_SECONDS = 30 + def get_drm_info(self) -> str: return str(os.listdir("/sys/class/drm")) def get_wireless_info(self) -> str: - iw_out = sp.check_output(["iw", "dev"], universal_newlines=True) + iw_out = sp.check_output( + ["iw", "dev"], + timeout=self.COMMAND_TIMEOUT_SECONDS, + universal_newlines=True, + ) lines = iw_out.splitlines() lines_to_write = list( filter( @@ -75,6 +82,7 @@ def get_usb_info(self) -> str: "-s", ], universal_newlines=True, + timeout=self.COMMAND_TIMEOUT_SECONDS, ).splitlines() out.sort() return "\n".join(out) @@ -82,6 +90,7 @@ def get_usb_info(self) -> str: def get_pci_info(self) -> str: return sp.check_output( ["lspci", "-i", "{}/usr/share/misc/pci.ids".format(SNAP)], + timeout=self.COMMAND_TIMEOUT_SECONDS, universal_newlines=True, ) @@ -302,6 +311,38 @@ def is_hardware_renderer_available(self) -> bool: print("[ ERR ] Software rendering detected", file=sys.stderr) return False + def wait_for_graphical_target(self, max_wait_seconds: int) -> bool: + """Wait for the DUT to reach graphical.target in systemd critical chain + + :param max_wait_seconds: num seconds to wait at most + :return: whether graphical.target was reached within max_wait_seconds + """ + + start = time.time() + print("Checking if DUT has reached graphical.target...") + while time.time() - start < max_wait_seconds: + try: + out = sp.run( + [ + "systemd-analyze", + "critical-chain", + "graphical.target", + "--no-pager", + ], + stdout=sp.DEVNULL, + stderr=sp.DEVNULL, + timeout=min(5, max_wait_seconds), + ) + if out.returncode == 0: + print("Graphical target reached!") + return True + else: + time.sleep(1) + except sp.TimeoutExpired: + return False + + return False + def parse_unity_support_output( self, unity_output_string: str ) -> T.Dict[str, str]: @@ -390,6 +431,16 @@ def create_parser(): action="store_true", help="If specified, check if hardware rendering is being used", ) + parser.add_argument( + "--graphical-target-timeout", + default=120, + type=int, + dest="graphical_target_timeout", + help="How many seconds should we wait for systemd to report " + "that it has reached graphical.target in its critical chain " + "before the renderer check starts. " + "Default is 120 seconds. Ignored if -g/--graphics is not specified.", + ) return parser @@ -469,7 +520,17 @@ def main() -> int: if args.do_renderer_check: tester = HardwareRendererTester() - if has_desktop_environment() and tester.has_display_connection(): + graphical_target_reached = tester.wait_for_graphical_target( + args.graphical_target_timeout + ) + if not graphical_target_reached: + print( + "[ ERR ] systemd's graphical.target was not reached", + "in {} seconds.".format(args.graphical_target_timeout), + "Marking the renderer test as failed.", + ) + renderer_test_passed = False + elif has_desktop_environment() and tester.has_display_connection(): # skip renderer test if there's no display renderer_test_passed = tester.is_hardware_renderer_available() diff --git a/providers/base/tests/test_reboot_check_test.py b/providers/base/tests/test_reboot_check_test.py index 54d0e6476e..87dc83753c 100644 --- a/providers/base/tests/test_reboot_check_test.py +++ b/providers/base/tests/test_reboot_check_test.py @@ -148,6 +148,51 @@ def test_is_hardware_renderer_available_fail( tester = RCT.HardwareRendererTester() self.assertFalse(tester.is_hardware_renderer_available()) + def test_slow_boot_scenario(self): + + def fake_time(delta: int, ticks=2): + # fake a time.time() delta using closure + call_idx = [0] + + def wrapped(): + if call_idx[0] != ticks: + call_idx[0] += 1 + return 0 # when time.time is initially called + else: + return delta # the "last" time when time.time is called + + return wrapped + + with patch("subprocess.run") as mock_run, patch( + "time.sleep" + ) as mock_sleep, patch("time.time") as mock_time, patch( + "sys.argv", + sh_split("reboot_check_test.py -g --graphical-target-timeout 2"), + ): + mock_run.side_effect = lambda *args, **kwargs: sp.CompletedProcess( + [], + 1, + "systemd says it's not ready", + "graphical target not reached blah", + ) + mock_sleep.side_effect = do_nothing + mock_time.side_effect = fake_time(3) + tester = RCT.HardwareRendererTester() + + self.assertFalse(tester.wait_for_graphical_target(2)) + + mock_sleep.reset_mock() + mock_time.side_effect = fake_time(3) + tester = RCT.HardwareRendererTester() + self.assertEqual(RCT.main(), 1) + self.assertTrue(mock_time.called) + self.assertTrue(mock_sleep.called) + + mock_time.side_effect = fake_time(3) + mock_run.side_effect = sp.TimeoutExpired([], 1) + tester = RCT.HardwareRendererTester() + self.assertFalse(tester.wait_for_graphical_target(2)) + class InfoDumpTests(unittest.TestCase): @classmethod