From c9dce94d316028b15d9f2781fee959da745aea52 Mon Sep 17 00:00:00 2001 From: James Falcon Date: Tue, 24 Sep 2024 15:07:43 -0500 Subject: [PATCH] test: Refactor test_cc_set_hostname.py and test_cc_ntp.py (#5727) * pytestify * remove FilesystemMockingTestCase base class * fix typing * general cleanup Any typing changes are to satisfy mypy. --- cloudinit/config/cc_ntp.py | 13 +- conftest.py | 1 + pyproject.toml | 2 - tests/unittests/config/test_cc_ntp.py | 425 +++++++++--------- .../unittests/config/test_cc_set_hostname.py | 357 ++++++--------- tests/unittests/util.py | 43 +- 6 files changed, 371 insertions(+), 470 deletions(-) diff --git a/cloudinit/config/cc_ntp.py b/cloudinit/config/cc_ntp.py index e2b83191a19..6e7bd5c2978 100644 --- a/cloudinit/config/cc_ntp.py +++ b/cloudinit/config/cc_ntp.py @@ -9,6 +9,7 @@ import copy import logging import os +from typing import Dict, Mapping from cloudinit import subp, temp_utils, templater, type_utils, util from cloudinit.cloud import Cloud @@ -98,7 +99,7 @@ } # This is Distro-specific configuration overrides of the base config -DISTRO_CLIENT_CONFIG = { +DISTRO_CLIENT_CONFIG: Dict[str, Dict] = { "alpine": { "chrony": { "confpath": "/etc/chrony/chrony.conf", @@ -279,7 +280,7 @@ def distro_ntp_client_configs(distro): return cfg -def select_ntp_client(ntp_client, distro): +def select_ntp_client(ntp_client, distro) -> Mapping: """Determine which ntp client is to be used, consulting the distro for its preference. @@ -318,7 +319,7 @@ def select_ntp_client(ntp_client, distro): 'Selected distro preferred NTP client "%s", not yet installed', client, ) - clientcfg = distro_cfg.get(client) + clientcfg = distro_cfg.get(client, {}) else: LOG.debug( 'Selected NTP client "%s" via distro system config', @@ -565,7 +566,7 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None: template_fn = None if not ntp_client_config.get("template"): - template_name = ntp_client_config.get("template_name").replace( + template_name = ntp_client_config["template_name"].replace( "{distro}", cloud.distro.name ) template_fn = cloud.get_template_filename(template_name) @@ -611,14 +612,14 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None: try: cloud.distro.manage_service( - "enable", ntp_client_config.get("service_name") + "enable", ntp_client_config["service_name"] ) except subp.ProcessExecutionError as e: LOG.exception("Failed to enable ntp service: %s", e) raise try: cloud.distro.manage_service( - "reload", ntp_client_config.get("service_name") + "reload", ntp_client_config["service_name"] ) except subp.ProcessExecutionError as e: LOG.exception("Failed to reload/start ntp service: %s", e) diff --git a/conftest.py b/conftest.py index 77111a81cab..36320f237dc 100644 --- a/conftest.py +++ b/conftest.py @@ -208,6 +208,7 @@ def paths(tmpdir): "cloud_dir": tmpdir.mkdir("cloud_dir").strpath, "docs_dir": tmpdir.mkdir("docs_dir").strpath, "run_dir": tmpdir.mkdir("run_dir").strpath, + "templates_dir": tmpdir.mkdir("templates_dir").strpath, } return helpers.Paths(dirs) diff --git a/pyproject.toml b/pyproject.toml index 97760a520b7..86a1d7a20af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -138,7 +138,6 @@ module = [ "tests.unittests.config.test_cc_locale", "tests.unittests.config.test_cc_mcollective", "tests.unittests.config.test_cc_mounts", - "tests.unittests.config.test_cc_ntp", "tests.unittests.config.test_cc_phone_home", "tests.unittests.config.test_cc_power_state_change", "tests.unittests.config.test_cc_puppet", @@ -147,7 +146,6 @@ module = [ "tests.unittests.config.test_cc_rh_subscription", "tests.unittests.config.test_cc_rsyslog", "tests.unittests.config.test_cc_runcmd", - "tests.unittests.config.test_cc_set_hostname", "tests.unittests.config.test_cc_snap", "tests.unittests.config.test_cc_ssh", "tests.unittests.config.test_cc_timezone", diff --git a/tests/unittests/config/test_cc_ntp.py b/tests/unittests/config/test_cc_ntp.py index ead6f5213f3..c28da73a213 100644 --- a/tests/unittests/config/test_cc_ntp.py +++ b/tests/unittests/config/test_cc_ntp.py @@ -3,24 +3,19 @@ import os import re import shutil -from functools import partial from os.path import dirname +from typing import Any, Dict, List import pytest -from cloudinit import helpers, util +from cloudinit import util from cloudinit.config import cc_ntp from cloudinit.config.schema import ( SchemaValidationError, get_schema, validate_cloudconfig_schema, ) -from tests.unittests.helpers import ( - CiTestCase, - FilesystemMockingTestCase, - mock, - skipUnlessJsonSchema, -) +from tests.unittests.helpers import mock, skipUnlessJsonSchema from tests.unittests.util import get_cloud NTP_TEMPLATE = """\ @@ -38,48 +33,37 @@ """ -class TestNtp(FilesystemMockingTestCase): - with_logs = True - - def setUp(self): - super(TestNtp, self).setUp() - self.new_root = self.tmp_dir() - self.add_patch("cloudinit.util.system_is_snappy", "m_snappy") - self.m_snappy.return_value = False - self.new_root = self.reRoot() - self._get_cloud = partial( - get_cloud, paths=helpers.Paths({"templates_dir": self.new_root}) - ) +class TestNtp: + @pytest.fixture + def service_mocks(self, mocker): + mocker.patch("cloudinit.config.cc_ntp.install_ntp_client") + mocker.patch("cloudinit.config.cc_ntp.util.is_BSD", return_value=False) - def _get_template_path(self, template_name, distro, basepath=None): + def _get_template_path(self, template_name, distro, templates_dir): # ntp.conf.{distro} -> ntp.conf.debian.tmpl template_fn = "{0}.tmpl".format( template_name.replace("{distro}", distro) ) - if not basepath: - basepath = self.new_root - path = os.path.join(basepath, template_fn) + path = os.path.join(templates_dir, template_fn) return path - def _generate_template(self, template=None): - if not template: - template = NTP_TEMPLATE - confpath = os.path.join(self.new_root, "client.conf") - template_fn = os.path.join(self.new_root, "client.conf.tmpl") + def _generate_template(self, templates_dir, template=NTP_TEMPLATE): + confpath = os.path.join(templates_dir, "client.conf") + template_fn = os.path.join(templates_dir, "client.conf.tmpl") util.write_file(template_fn, content=template) return (confpath, template_fn) - def _mock_ntp_client_config(self, client=None, distro=None): - if not client: - client = "ntp" - if not distro: - distro = "ubuntu" + def _mock_ntp_client_config( + self, templates_dir, client="ntp", distro="ubuntu" + ): dcfg = cc_ntp.distro_ntp_client_configs(distro) if client == "systemd-timesyncd": template = TIMESYNCD_TEMPLATE else: template = NTP_TEMPLATE - (confpath, _template_fn) = self._generate_template(template=template) + (confpath, _template_fn) = self._generate_template( + templates_dir, template=template + ) ntpconfig = copy.deepcopy(dcfg[client]) ntpconfig["confpath"] = confpath ntpconfig["template_name"] = os.path.basename(confpath) @@ -117,41 +101,43 @@ def test_ntp_install_no_op_with_empty_pkg_list(self, mock_subp): ) install_func.assert_called_once_with([]) - def test_ntp_rename_ntp_conf(self): + def test_ntp_rename_ntp_conf(self, tmpdir): """When NTP_CONF exists, rename_ntp moves it.""" - ntpconf = self.tmp_path("ntp.conf", self.new_root) + ntpconf = os.path.join(tmpdir, "ntp.conf") util.write_file(ntpconf, "") cc_ntp.rename_ntp_conf(confpath=ntpconf) - self.assertFalse(os.path.exists(ntpconf)) - self.assertTrue(os.path.exists("{0}.dist".format(ntpconf))) + assert not os.path.exists(ntpconf) + assert os.path.exists("{0}.dist".format(ntpconf)) - def test_ntp_rename_ntp_conf_skip_missing(self): + def test_ntp_rename_ntp_conf_skip_missing(self, tmp_path): """When NTP_CONF doesn't exist rename_ntp doesn't create a file.""" - ntpconf = self.tmp_path("ntp.conf", self.new_root) - self.assertFalse(os.path.exists(ntpconf)) + ntpconf = tmp_path / "ntp.conf" + assert not os.path.exists(ntpconf) cc_ntp.rename_ntp_conf(confpath=ntpconf) - self.assertFalse(os.path.exists("{0}.dist".format(ntpconf))) - self.assertFalse(os.path.exists(ntpconf)) + assert not os.path.exists("{0}.dist".format(ntpconf)) + assert not os.path.exists(ntpconf) - def test_write_ntp_config_template_uses_ntp_conf_distro_no_servers(self): + def test_write_ntp_config_template_uses_ntp_conf_distro_no_servers( + self, tmpdir + ): """write_ntp_config_template reads from $client.conf.distro.tmpl""" - servers = [] - pools = ["10.0.0.1", "10.0.0.2"] - (confpath, template_fn) = self._generate_template() + (confpath, template_fn) = self._generate_template(tmpdir) cc_ntp.write_ntp_config_template( "ubuntu", - servers=servers, - pools=pools, + servers=[], + pools=["10.0.0.1", "10.0.0.2"], path=confpath, template_fn=template_fn, template=None, ) - self.assertEqual( - "servers []\npools ['10.0.0.1', '10.0.0.2']\n", - util.load_text_file(confpath), + assert ( + "servers []\npools ['10.0.0.1', '10.0.0.2']\n" + == util.load_text_file(confpath) ) - def test_write_ntp_config_template_defaults_pools_w_empty_lists(self): + def test_write_ntp_config_template_defaults_pools_w_empty_lists( + self, tmpdir + ): """write_ntp_config_template defaults pools servers upon empty config. When both pools and servers are empty, default NR_POOL_SERVERS get @@ -159,22 +145,20 @@ def test_write_ntp_config_template_defaults_pools_w_empty_lists(self): """ distro = "ubuntu" pools = cc_ntp.generate_server_names(distro) - servers = [] - (confpath, template_fn) = self._generate_template() + (confpath, template_fn) = self._generate_template(tmpdir) cc_ntp.write_ntp_config_template( distro, - servers=servers, + servers=[], pools=pools, path=confpath, template_fn=template_fn, template=None, ) - self.assertEqual( - "servers []\npools {0}\n".format(pools), - util.load_text_file(confpath), + assert "servers []\npools {0}\n".format(pools) == util.load_text_file( + confpath ) - def test_defaults_pools_empty_lists_sles(self): + def test_defaults_pools_empty_lists_sles(self, tmpdir, caplog): """write_ntp_config_template defaults opensuse pools upon empty config. When both pools and servers are empty, default NR_POOL_SERVERS get @@ -182,7 +166,7 @@ def test_defaults_pools_empty_lists_sles(self): """ distro = "sles" default_pools = cc_ntp.generate_server_names(distro) - (confpath, template_fn) = self._generate_template() + (confpath, template_fn) = self._generate_template(tmpdir) cc_ntp.write_ntp_config_template( distro, @@ -193,24 +177,23 @@ def test_defaults_pools_empty_lists_sles(self): template=None, ) for pool in default_pools: - self.assertIn("opensuse", pool) - self.assertEqual( - "servers []\npools {0}\n".format(default_pools), - util.load_text_file(confpath), - ) - self.assertIn( + assert "opensuse" in pool + assert "servers []\npools {0}\n".format( + default_pools + ) == util.load_text_file(confpath) + assert ( "Adding distro default ntp pool servers: {0}".format( ",".join(default_pools) - ), - self.logs.getvalue(), + ) + in caplog.text ) - def test_timesyncd_template(self): + def test_timesyncd_template(self, tmpdir): """Test timesycnd template is correct""" pools = ["0.mycompany.pool.ntp.org", "3.mycompany.pool.ntp.org"] servers = ["192.168.23.3", "192.168.23.4"] (confpath, template_fn) = self._generate_template( - template=TIMESYNCD_TEMPLATE + tmpdir, template=TIMESYNCD_TEMPLATE ) cc_ntp.write_ntp_config_template( "ubuntu", @@ -220,10 +203,10 @@ def test_timesyncd_template(self): template_fn=template_fn, template=None, ) - self.assertEqual( - "[Time]\nNTP=%s %s \n" % (" ".join(servers), " ".join(pools)), - util.load_text_file(confpath), - ) + assert "[Time]\nNTP=%s %s \n" % ( + " ".join(servers), + " ".join(pools), + ) == util.load_text_file(confpath) def test_distro_ntp_client_configs(self): """Test we have updated ntp client configs on different distros""" @@ -233,52 +216,50 @@ def test_distro_ntp_client_configs(self): for distro in cc_ntp.distros: if distro not in delta: result = cc_ntp.distro_ntp_client_configs(distro) - self.assertEqual(base, result) + assert base == result # for distros with delta, ensure the merged config values match # what is set in the delta for distro in delta.keys(): result = cc_ntp.distro_ntp_client_configs(distro) for client in delta[distro].keys(): for key in delta[distro][client].keys(): - self.assertEqual( - delta[distro][client][key], result[client][key] - ) - - def _get_expected_pools(self, pools, distro, client): - expected_pools = None - if client in ["ntp", "chrony"]: - if client == "ntp" and distro == "alpine": - # NTP for Alpine Linux is Busybox's ntp which does not - # support 'pool' lines in its configuration file. - expected_pools = [] - else: - expected_pools = [ - "pool {0} iburst".format(pool) for pool in pools - ] + assert delta[distro][client][key] == result[client][key] + + def _get_expected_pools( + self, pools: List[str], distro, client + ) -> List[str]: + if client == "ntp" and distro == "alpine": + # NTP for Alpine Linux is Busybox's ntp which does not + # support 'pool' lines in its configuration file. + expected_pools = [] + elif client in ["ntp", "chrony"]: + expected_pools = ["pool {0} iburst".format(pool) for pool in pools] elif client == "systemd-timesyncd": - expected_pools = " ".join(pools) + expected_pools = pools + else: + raise RuntimeError(f"Unknown client: {client}") return expected_pools - def _get_expected_servers(self, servers, distro, client): - expected_servers = None - if client in ["ntp", "chrony"]: - if client == "ntp" and distro == "alpine": - # NTP for Alpine Linux is Busybox's ntp which only supports - # 'server' lines without iburst option. - expected_servers = [ - "server {0}".format(srv) for srv in servers - ] - else: - expected_servers = [ - "server {0} iburst".format(srv) for srv in servers - ] + def _get_expected_servers( + self, servers: List[str], distro, client + ) -> List[str]: + if client == "ntp" and distro == "alpine": + # NTP for Alpine Linux is Busybox's ntp which only supports + # 'server' lines without iburst option. + expected_servers = ["server {0}".format(srv) for srv in servers] + elif client in ["ntp", "chrony"]: + expected_servers = [ + "server {0} iburst".format(srv) for srv in servers + ] elif client == "systemd-timesyncd": - expected_servers = " ".join(servers) + expected_servers = servers + else: + raise RuntimeError(f"Unknown client: {client}") return expected_servers - def test_ntp_handler_real_distro_ntp_templates(self): + def test_ntp_handler_real_distro_ntp_templates(self, tmpdir): """Test ntp handler renders the shipped distro ntp client templates.""" pools = ["0.mycompany.pool.ntp.org", "3.mycompany.pool.ntp.org"] servers = ["192.168.23.3", "192.168.23.4"] @@ -286,9 +267,7 @@ def test_ntp_handler_real_distro_ntp_templates(self): for distro in cc_ntp.distros: distro_cfg = cc_ntp.distro_ntp_client_configs(distro) ntpclient = distro_cfg[client] - confpath = os.path.join( - self.new_root, ntpclient.get("confpath")[1:] - ) + confpath = os.path.join(tmpdir, ntpclient["confpath"][1:]) template = ntpclient.get("template_name") # find sourcetree template file root_dir = ( @@ -296,9 +275,9 @@ def test_ntp_handler_real_distro_ntp_templates(self): + "/templates" ) source_fn = self._get_template_path( - template, distro, basepath=root_dir + template, distro, templates_dir=root_dir ) - template_fn = self._get_template_path(template, distro) + template_fn = self._get_template_path(template, distro, tmpdir) # don't fail if cloud-init doesn't have a template for # a distro,client pair if not os.path.exists(source_fn): @@ -318,25 +297,21 @@ def test_ntp_handler_real_distro_ntp_templates(self): expected_servers = self._get_expected_servers( servers, distro, client ) - print("distro=%s client=%s" % (distro, client)) + print(f"distro={distro} client={client}") for sline in expected_servers: - self.assertIn( - sline, - content_lines, - "failed to render {0} conf for distro:{1}".format( - client, distro - ), + assert ( + sline in content_lines + ), "failed to render {0} conf for distro:{1}".format( + client, distro ) expected_pools = self._get_expected_pools( pools, distro, client ) if expected_pools != []: for pline in expected_pools: - self.assertIn( - pline, - content_lines, + assert pline in content_lines, ( "failed to render {0} conf" - " for distro:{1}".format(client, distro), + " for distro:{1}".format(client, distro) ) elif client == "systemd-timesyncd": expected_servers = self._get_expected_servers( @@ -349,74 +324,79 @@ def test_ntp_handler_real_distro_ntp_templates(self): "# cloud-init generated file\n" "# See timesyncd.conf(5) for details.\n\n" "[Time]\nNTP=%s %s \n" - % (expected_servers, expected_pools) + % ( + " ".join(expected_servers), + " ".join(expected_pools), + ) ) - self.assertEqual(expected_content, content) + assert expected_content == content - def test_no_ntpcfg_does_nothing(self): + def test_no_ntpcfg_does_nothing(self, caplog): """When no ntp section is defined handler logs a warning and noops.""" - cc_ntp.handle("cc_ntp", {}, None, []) - self.assertEqual( - "DEBUG: Skipping module named cc_ntp, " - "not present or disabled by cfg\n", - self.logs.getvalue(), + cc_ntp.handle("cc_ntp", {}, get_cloud(), []) + assert ( + "Skipping module named cc_ntp, " + "not present or disabled by cfg\n" in caplog.text ) - @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_schema_validation_allows_empty_ntp_config( - self, m_select + self, service_mocks, mocker, paths, caplog ): """Ntp schema validation allows for an empty ntp: configuration.""" - valid_empty_configs = [{"ntp": {}}, {"ntp": None}] + m_select = mocker.patch("cloudinit.config.cc_ntp.select_ntp_client") + + valid_empty_configs: List[Dict] = [{"ntp": {}}, {"ntp": None}] for valid_empty_config in valid_empty_configs: for distro in cc_ntp.distros: # skip the test if the distro is COS. As in COS, the default # config file is installed if distro == "cos": - return - mycloud = self._get_cloud(distro) - ntpconfig = self._mock_ntp_client_config(distro=distro) + continue + mycloud = get_cloud(distro, paths=paths) + ntpconfig = self._mock_ntp_client_config( + paths.cfgs["templates_dir"], distro=distro + ) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig - cc_ntp.handle("cc_ntp", valid_empty_config, mycloud, []) + + with mock.patch.object(mycloud.distro, "manage_service"): + cc_ntp.handle("cc_ntp", valid_empty_config, mycloud, []) + if distro == "alpine": # _mock_ntp_client_config call above did not specify a # client value and so it defaults to "ntp" which on # Alpine Linux only supports servers and not pools. servers = cc_ntp.generate_server_names(mycloud.distro.name) - self.assertEqual( - "servers {0}\npools []\n".format(servers), - util.load_text_file(confpath), - ) + assert "servers {0}\npools []\n".format( + servers + ) == util.load_text_file(confpath) else: pools = cc_ntp.generate_server_names(mycloud.distro.name) - self.assertEqual( - "servers []\npools {0}\n".format(pools), - util.load_text_file(confpath), - ) - self.assertNotIn( - "Invalid cloud-config provided:", self.logs.getvalue() - ) + assert "servers []\npools {0}\n".format( + pools + ) == util.load_text_file(confpath) + assert "Invalid cloud-config provided:" not in caplog.text - @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") - def test_ntp_handler_timesyncd(self, m_select): + def test_ntp_handler_timesyncd(self, service_mocks, mocker, tmpdir, paths): """Test ntp handler configures timesyncd""" + m_select = mocker.patch("cloudinit.config.cc_ntp.select_ntp_client") servers = ["192.168.2.1", "192.168.2.2"] pools = ["0.mypool.org"] cfg = {"ntp": {"servers": servers, "pools": pools}} client = "systemd-timesyncd" for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro) + mycloud = get_cloud(distro, paths=paths) ntpconfig = self._mock_ntp_client_config( - distro=distro, client=client + paths.cfgs["templates_dir"], distro=distro, client=client ) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig - cc_ntp.handle("cc_ntp", cfg, mycloud, []) - self.assertEqual( - "[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n", - util.load_text_file(confpath), + with mock.patch.object(mycloud.distro, "manage_service"): + cc_ntp.handle("cc_ntp", cfg, mycloud, []) + assert ( + "[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n" + == util.load_text_file(confpath) ) @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") @@ -424,21 +404,25 @@ def test_ntp_handler_enabled_false(self, m_select): """Test ntp handler does not run if enabled: false""" cfg = {"ntp": {"enabled": False}} for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro) - cc_ntp.handle("notimportant", cfg, mycloud, None) - self.assertEqual(0, m_select.call_count) + mycloud = get_cloud(distro) + cc_ntp.handle("notimportant", cfg, mycloud, []) + assert 0 == m_select.call_count @mock.patch("cloudinit.subp.subp") @mock.patch("cloudinit.subp.which", return_value=True) @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") @mock.patch("cloudinit.distros.Distro.uses_systemd") - def test_ntp_the_whole_package(self, m_sysd, m_select, m_which, m_subp): + def test_ntp_the_whole_package( + self, m_sysd, m_select, m_which, m_subp, tmpdir, paths + ): """Test enabled config renders template, and restarts service""" cfg = {"ntp": {"enabled": True}} for distro in cc_ntp.distros: m_subp.reset_mock() - mycloud = self._get_cloud(distro) - ntpconfig = self._mock_ntp_client_config(distro=distro) + mycloud = get_cloud(distro, paths=paths) + ntpconfig = self._mock_ntp_client_config( + paths.cfgs["templates_dir"], distro=distro + ) confpath = ntpconfig["confpath"] service_name = ntpconfig["service_name"] m_select.return_value = ntpconfig @@ -457,7 +441,7 @@ def test_ntp_the_whole_package(self, m_sysd, m_select, m_which, m_subp): # skip the test if the distro is COS. As in COS, the default # config file is installed if distro == "cos": - return + continue if distro == "alpine": uses_systemd = False @@ -500,45 +484,41 @@ def test_ntp_the_whole_package(self, m_sysd, m_select, m_which, m_subp): m_util.is_BSD.return_value = is_FreeBSD or is_OpenBSD m_util.is_FreeBSD.return_value = is_FreeBSD m_util.is_OpenBSD.return_value = is_OpenBSD - cc_ntp.handle("notimportant", cfg, mycloud, None) + cc_ntp.handle("notimportant", cfg, mycloud, []) m_subp.assert_called_with( expected_service_call, capture=True, rcs=None ) - self.assertEqual(expected_content, util.load_text_file(confpath)) + assert expected_content == util.load_text_file(confpath) @mock.patch("cloudinit.util.system_info") def test_opensuse_picks_chrony(self, m_sysinfo): """Test opensuse picks chrony or ntp on certain distro versions""" # < 15.0 => ntp m_sysinfo.return_value = {"dist": ("openSUSE", "13.2", "Harlequin")} - mycloud = self._get_cloud("opensuse") + mycloud = get_cloud("opensuse") expected_client = mycloud.distro.preferred_ntp_clients[0] - self.assertEqual("ntp", expected_client) + assert "ntp" == expected_client # >= 15.0 and not openSUSE => chrony m_sysinfo.return_value = { "dist": ("SLES", "15.0", "SUSE Linux Enterprise Server 15") } - mycloud = self._get_cloud("sles") + mycloud = get_cloud("sles") expected_client = mycloud.distro.preferred_ntp_clients[0] - self.assertEqual("chrony", expected_client) + assert "chrony" == expected_client # >= 15.0 and openSUSE and ver != 42 => chrony m_sysinfo.return_value = { "dist": ("openSUSE Tumbleweed", "20180326", "timbleweed") } - mycloud = self._get_cloud("opensuse") + mycloud = get_cloud("opensuse") expected_client = mycloud.distro.preferred_ntp_clients[0] - self.assertEqual("chrony", expected_client) + assert "chrony" == expected_client @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_snappy_system_picks_timesyncd(self, m_which): """Test snappy systems prefer installed clients""" - - # we are on ubuntu-core here - self.m_snappy.return_value = True - # ubuntu core systems will have timesyncd installed, so simulate that. # First None is for the 'eatmydata' check when initializing apt # when initializing the distro class. The rest represent possible @@ -547,19 +527,19 @@ def test_snappy_system_picks_timesyncd(self, m_which): [None, None, "/lib/systemd/systemd-timesyncd", None, None, None] ) distro = "ubuntu" - mycloud = self._get_cloud(distro) + mycloud = get_cloud(distro) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_client = "systemd-timesyncd" expected_cfg = distro_configs[expected_client] expected_calls = [] # we only get to timesyncd - for client in mycloud.distro.preferred_ntp_clients[0:2]: + for client in mycloud.distro.preferred_ntp_clients[:2]: cfg = distro_configs[client] expected_calls.append(mock.call(cfg["check_exe"])) result = cc_ntp.select_ntp_client(None, mycloud.distro) m_which.assert_has_calls(expected_calls) - self.assertEqual(sorted(expected_cfg), sorted(cfg)) - self.assertEqual(sorted(expected_cfg), sorted(result)) + assert sorted(expected_cfg) == sorted(cfg) + assert sorted(expected_cfg) == sorted(result) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_ntp_distro_searches_all_preferred_clients(self, m_which): @@ -567,7 +547,7 @@ def test_ntp_distro_searches_all_preferred_clients(self, m_which): # nothing is installed m_which.return_value = None for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro) + mycloud = get_cloud(distro) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_client = mycloud.distro.preferred_ntp_clients[0] expected_cfg = distro_configs[expected_client] @@ -577,7 +557,7 @@ def test_ntp_distro_searches_all_preferred_clients(self, m_which): expected_calls.append(mock.call(cfg["check_exe"])) cc_ntp.select_ntp_client({}, mycloud.distro) m_which.assert_has_calls(expected_calls) - self.assertEqual(sorted(expected_cfg), sorted(cfg)) + assert sorted(expected_cfg) == sorted(cfg) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_user_cfg_ntp_client_auto_uses_distro_clients(self, m_which): @@ -585,7 +565,7 @@ def test_user_cfg_ntp_client_auto_uses_distro_clients(self, m_which): # nothing is installed m_which.return_value = None for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro) + mycloud = get_cloud(distro) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_client = mycloud.distro.preferred_ntp_clients[0] expected_cfg = distro_configs[expected_client] @@ -595,7 +575,7 @@ def test_user_cfg_ntp_client_auto_uses_distro_clients(self, m_which): expected_calls.append(mock.call(cfg["check_exe"])) cc_ntp.select_ntp_client("auto", mycloud.distro) m_which.assert_has_calls(expected_calls) - self.assertEqual(sorted(expected_cfg), sorted(cfg)) + assert sorted(expected_cfg) == sorted(cfg) @mock.patch("cloudinit.config.cc_ntp.write_ntp_config_template") @mock.patch("cloudinit.cloud.Cloud.get_template_filename") @@ -609,11 +589,13 @@ def test_ntp_custom_client_overrides_installed_clients( for distro in cc_ntp.distros: # client is not installed m_which.return_value = None - mycloud = self._get_cloud(distro) + mycloud = get_cloud(distro) with mock.patch.object( mycloud.distro, "install_packages" - ) as m_install: - cc_ntp.handle("notimportant", cfg, mycloud, None) + ) as m_install, mock.patch.object( + mycloud.distro, "manage_service" + ): + cc_ntp.handle("notimportant", cfg, mycloud, []) m_install.assert_called_with([client]) m_which.assert_called_with(client) @@ -625,11 +607,11 @@ def test_ntp_system_config_overrides_distro_builtin_clients(self, m_which): # no clients installed m_which.return_value = None for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro, sys_cfg=sys_cfg) + mycloud = get_cloud(distro, sys_cfg=sys_cfg) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_cfg = distro_configs[system_client] result = cc_ntp.select_ntp_client(None, mycloud.distro) - self.assertEqual(sorted(expected_cfg), sorted(result)) + assert sorted(expected_cfg) == sorted(result) m_which.assert_has_calls([]) @mock.patch("cloudinit.config.cc_ntp.subp.which") @@ -641,18 +623,18 @@ def test_ntp_user_config_overrides_system_cfg(self, m_which): # no clients installed m_which.return_value = None for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro, sys_cfg=sys_cfg) + mycloud = get_cloud(distro, sys_cfg=sys_cfg) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_cfg = distro_configs[user_client] result = cc_ntp.select_ntp_client(user_client, mycloud.distro) - self.assertEqual(sorted(expected_cfg), sorted(result)) + assert sorted(expected_cfg) == sorted(result) m_which.assert_has_calls([]) @mock.patch("cloudinit.config.cc_ntp.install_ntp_client") - def test_ntp_user_provided_config_with_template(self, m_install): + def test_ntp_user_provided_config_with_template(self, m_install, tmpdir): custom = r"\n#MyCustomTemplate" user_template = NTP_TEMPLATE + custom - confpath = os.path.join(self.new_root, "etc/myntp/myntp.conf") + confpath = os.path.join(tmpdir, "etc/myntp/myntp.conf") cfg = { "ntp": { "pools": ["mypool.org"], @@ -666,21 +648,23 @@ def test_ntp_user_provided_config_with_template(self, m_install): }, } } + mock_path = "cloudinit.config.cc_ntp.temp_utils.get_tmp_ancestor" for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro) - mock_path = "cloudinit.config.cc_ntp.temp_utils.get_tmp_ancestor" - with mock.patch(mock_path, lambda *_: self.new_root): - cc_ntp.handle("notimportant", cfg, mycloud, None) - self.assertEqual( - "servers []\npools ['mypool.org']\n%s" % custom, - util.load_text_file(confpath), + mycloud = get_cloud(distro) + with mock.patch(mock_path, lambda *_: tmpdir), mock.patch.object( + mycloud.distro, "manage_service" + ): + cc_ntp.handle("notimportant", cfg, mycloud, []) + assert ( + "servers []\npools ['mypool.org']\n%s" % custom + == util.load_text_file(confpath) ) @mock.patch("cloudinit.config.cc_ntp.supplemental_schema_validation") @mock.patch("cloudinit.config.cc_ntp.install_ntp_client") @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_user_provided_config_template_only( - self, m_select, m_install, m_schema + self, m_select, m_install, m_schema, tmpdir ): """Test custom template for default client""" custom = r"\n#MyCustomTemplate" @@ -695,43 +679,44 @@ def test_ntp_user_provided_config_template_only( } expected_merged_cfg = { "check_exe": "chronyd", - "confpath": "{tmpdir}/client.conf".format(tmpdir=self.new_root), + "confpath": f"{tmpdir}/client.conf", "template_name": "client.conf", "template": user_template, "service_name": "chrony", "packages": ["chrony"], } for distro in cc_ntp.distros: - mycloud = self._get_cloud(distro) + mycloud = get_cloud(distro) ntpconfig = self._mock_ntp_client_config( - client=client, distro=distro + tmpdir, client=client, distro=distro ) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig mock_path = "cloudinit.config.cc_ntp.temp_utils.get_tmp_ancestor" - with mock.patch(mock_path, lambda *_: self.new_root): - cc_ntp.handle("notimportant", {"ntp": cfg}, mycloud, None) - self.assertEqual( - "servers []\npools ['mypool.org']\n%s" % custom, - util.load_text_file(confpath), + with mock.patch(mock_path, lambda *_: tmpdir), mock.patch.object( + mycloud.distro, "manage_service" + ): + cc_ntp.handle("notimportant", {"ntp": cfg}, mycloud, []) + assert ( + "servers []\npools ['mypool.org']\n%s" % custom + == util.load_text_file(confpath) ) m_schema.assert_called_with(expected_merged_cfg) -class TestSupplementalSchemaValidation(CiTestCase): +class TestSupplementalSchemaValidation: def test_error_on_missing_keys(self): """ValueError raised reporting any missing required ntp:config keys""" - cfg = {} match = ( r"Invalid ntp configuration:\\nMissing required ntp:config" " keys: check_exe, confpath, packages, service_name" ) - with self.assertRaisesRegex(ValueError, match): - cc_ntp.supplemental_schema_validation(cfg) + with pytest.raises(ValueError, match=match): + cc_ntp.supplemental_schema_validation({}) def test_error_requiring_either_template_or_template_name(self): """ValueError raised if both template not template_name are None.""" - cfg = { + cfg: Dict[str, Any] = { "confpath": "someconf", "check_exe": "", "service_name": "", @@ -743,7 +728,7 @@ def test_error_requiring_either_template_or_template_name(self): r"Invalid ntp configuration:\\nEither ntp:config:template" " or ntp:config:template_name values are required" ) - with self.assertRaisesRegex(ValueError, match): + with pytest.raises(ValueError, match=match): cc_ntp.supplemental_schema_validation(cfg) def test_error_on_non_list_values(self): @@ -760,7 +745,7 @@ def test_error_on_non_list_values(self): r"Invalid ntp configuration:\\nExpected a list of required" " package names for ntp:config:packages. Found \\(NOPE\\)" ) - with self.assertRaisesRegex(ValueError, match): + with pytest.raises(ValueError, match=match): cc_ntp.supplemental_schema_validation(cfg) def test_error_on_non_string_values(self): @@ -780,11 +765,11 @@ def test_error_on_non_string_values(self): "Expected a string type for ntp:config:template. Found (4)", "Expected a string type for ntp:config:template_name. Found (5)", ] - with self.assertRaises(ValueError) as context_mgr: + with pytest.raises(ValueError) as context_mgr: cc_ntp.supplemental_schema_validation(cfg) - error_msg = str(context_mgr.exception) + error_msg = str(context_mgr.value) for error in errors: - self.assertIn(error, error_msg) + assert error in error_msg class TestNTPSchema: diff --git a/tests/unittests/config/test_cc_set_hostname.py b/tests/unittests/config/test_cc_set_hostname.py index 16c8bd76ac1..e336828638b 100644 --- a/tests/unittests/config/test_cc_set_hostname.py +++ b/tests/unittests/config/test_cc_set_hostname.py @@ -1,114 +1,111 @@ # This file is part of cloud-init. See LICENSE file for license information. import logging -import os -import shutil -import tempfile -from io import BytesIO from pathlib import Path from unittest import mock +import pytest from configobj import ConfigObj -from cloudinit import cloud, distros, helpers, util +from cloudinit import util from cloudinit.config import cc_set_hostname -from cloudinit.sources import DataSourceNone -from tests.unittests import helpers as t_help +from cloudinit.sources.DataSourceNone import DataSourceNone +from tests.unittests.util import get_cloud LOG = logging.getLogger(__name__) -class TestHostname(t_help.FilesystemMockingTestCase): +def fake_subp(*args, **kwargs): + if args[0][0] in ["hostname", "hostnamectl"]: + return None, None + raise RuntimeError(f"Unexpected subp: {args[0]}") - with_logs = True - def setUp(self): - super(TestHostname, self).setUp() - self.tmp = tempfile.mkdtemp() - util.ensure_dir(os.path.join(self.tmp, "data")) - self.addCleanup(shutil.rmtree, self.tmp) +def conf_parser(conf): + return dict(ConfigObj(conf.splitlines())) - def _fetch_distro(self, kind, conf=None): - cls = distros.fetch(kind) - paths = helpers.Paths({"cloud_dir": self.tmp}) - conf = {} if conf is None else conf - return cls(kind, conf, paths) - def test_debian_write_hostname_prefer_fqdn(self): - cfg = { - "hostname": "blah", - "prefer_fqdn_over_hostname": True, - "fqdn": "blah.yahoo.com", - } - distro = self._fetch_distro("debian", cfg) - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - contents = util.load_text_file("/etc/hostname") - self.assertEqual("blah.yahoo.com", contents.strip()) - - @mock.patch("cloudinit.distros.Distro.uses_systemd", return_value=False) - def test_rhel_write_hostname_prefer_hostname(self, m_uses_systemd): - cfg = { - "hostname": "blah", - "prefer_fqdn_over_hostname": False, - "fqdn": "blah.yahoo.com", - } - distro = self._fetch_distro("rhel", cfg) - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - contents = util.load_binary_file("/etc/sysconfig/network") - n_cfg = ConfigObj(BytesIO(contents)) - self.assertEqual({"HOSTNAME": "blah"}, dict(n_cfg)) - - @mock.patch("cloudinit.distros.Distro.uses_systemd", return_value=False) - def test_write_hostname_rhel(self, m_uses_systemd): - cfg = {"hostname": "blah", "fqdn": "blah.blah.blah.yahoo.com"} - distro = self._fetch_distro("rhel") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - contents = util.load_binary_file("/etc/sysconfig/network") - n_cfg = ConfigObj(BytesIO(contents)) - self.assertEqual({"HOSTNAME": "blah.blah.blah.yahoo.com"}, dict(n_cfg)) - - def test_write_hostname_debian(self): - cfg = { - "hostname": "blah", - "fqdn": "blah.blah.blah.yahoo.com", - } - distro = self._fetch_distro("debian") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - contents = util.load_text_file("/etc/hostname") - self.assertEqual("blah", contents.strip()) +@pytest.mark.usefixtures("fake_filesystem") +class TestHostname: + @pytest.fixture(autouse=True) + def common_mocks(self, mocker): + mocker.patch( + "cloudinit.distros.debian.subp.subp", side_effect=fake_subp + ) - @mock.patch("cloudinit.distros.Distro.uses_systemd", return_value=False) - def test_write_hostname_sles(self, m_uses_systemd): - cfg = { - "hostname": "blah.blah.blah.suse.com", - } - distro = self._fetch_distro("sles") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) + @pytest.mark.parametrize( + "distro_name,cfg,host_path,parser,expected", + ( + pytest.param( + "debian", + {"hostname": "blah", "fqdn": "blah.example.com"}, + "/etc/hostname", + lambda x: x, + "blah", + id="debian", + ), + pytest.param( + "debian", + { + "hostname": "blah", + "prefer_fqdn_over_hostname": True, + "fqdn": "blah.example.com", + }, + "/etc/hostname", + lambda x: x, + "blah.example.com", + id="debian_prefer_fqdn", + ), + pytest.param( + "rhel", + {"hostname": "blah", "fqdn": "blah.example.com"}, + "/etc/sysconfig/network", + conf_parser, + {"HOSTNAME": "blah.example.com"}, + id="rhel", + ), + pytest.param( + "rhel", + { + "hostname": "blah", + "prefer_fqdn_over_hostname": False, + "fqdn": "blah.example.com", + }, + "/etc/sysconfig/network", + conf_parser, + {"HOSTNAME": "blah"}, + id="rhel_prefer_hostname", + ), + pytest.param( + "sles", + {"hostname": "blah", "fqdn": "blah.example.com"}, + "/etc/HOSTNAME", + lambda x: x, + "blah", + id="sles", + ), + ), + ) + def test_write_hostname( + self, + distro_name, + cfg, + host_path, + parser, + expected, + paths, + mocker, + ): + mocker.patch( + "cloudinit.distros.Distro.uses_systemd", return_value=False + ) + cc = get_cloud(distro=distro_name, paths=paths, sys_cfg=cfg) cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - contents = util.load_text_file(distro.hostname_conf_fn) - self.assertEqual("blah", contents.strip()) + contents = util.load_text_file(host_path).strip() + assert expected == parser(contents) @mock.patch("cloudinit.distros.photon.subp.subp") - def test_photon_hostname(self, m_subp): + def test_photon_hostname(self, m_subp, paths): cfg1 = { "hostname": "photon", "prefer_fqdn_over_hostname": True, @@ -120,11 +117,8 @@ def test_photon_hostname(self, m_subp): "fqdn": "test2.vmware.com", } - ds = None m_subp.return_value = (None, None) - distro = self._fetch_distro("photon", cfg1) - paths = helpers.Paths({"cloud_dir": self.tmp}) - cc = cloud.Cloud(ds, paths, {}, distro, None) + cc = get_cloud(distro="photon", paths=paths, sys_cfg=cfg1) for c in [cfg1, cfg2]: cc_set_hostname.handle("cc_set_hostname", c, cc, []) print("\n", m_subp.call_args_list) @@ -156,62 +150,49 @@ def test_photon_hostname(self, m_subp): ] not in m_subp.call_args_list @mock.patch("cloudinit.util.get_hostname", return_value="localhost") - def test_multiple_calls_skips_unchanged_hostname(self, get_hostname): + def test_multiple_calls_skips_unchanged_hostname( + self, get_hostname, paths, caplog + ): """Only new hostname or fqdn values will generate a hostname call.""" - distro = self._fetch_distro("debian") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) + cc = get_cloud(distro="debian", paths=paths) cc_set_hostname.handle( "cc_set_hostname", {"hostname": "hostname1.me.com"}, cc, [] ) contents = util.load_text_file("/etc/hostname") - self.assertEqual("hostname1", contents.strip()) + assert "hostname1" == contents.strip() cc_set_hostname.handle( "cc_set_hostname", {"hostname": "hostname1.me.com"}, cc, [] ) - self.assertIn( - "DEBUG: No hostname changes. Skipping set_hostname\n", - self.logs.getvalue(), - ) + assert "No hostname changes. Skipping set_hostname\n" in caplog.text cc_set_hostname.handle( "cc_set_hostname", {"hostname": "hostname2.me.com"}, cc, [] ) contents = util.load_text_file("/etc/hostname") - self.assertEqual("hostname2", contents.strip()) - self.assertIn( - "Non-persistently setting the system hostname to hostname2", - self.logs.getvalue(), + assert "hostname2" == contents.strip() + assert ( + "Non-persistently setting the system hostname to hostname2" + in caplog.text ) @mock.patch("cloudinit.util.get_hostname", return_value="localhost") - def test_localhost_default_hostname(self, get_hostname): + def test_localhost_default_hostname(self, get_hostname, paths): """ No hostname set. Default value returned is localhost, but we shouldn't write it in /etc/hostname """ - distro = self._fetch_distro("debian") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = DataSourceNone.DataSourceNone({}, None, paths) - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) + cc = get_cloud(distro="debian", paths=paths, ds=DataSourceNone) util.write_file("/etc/hostname", "") cc_set_hostname.handle("cc_set_hostname", {}, cc, []) contents = util.load_text_file("/etc/hostname") - self.assertEqual("", contents.strip()) + assert "" == contents.strip() @mock.patch("cloudinit.util.get_hostname", return_value="localhost") - def test_localhost_user_given_hostname(self, get_hostname): + def test_localhost_user_given_hostname(self, get_hostname, paths): """ User set hostname is localhost. We should write it in /etc/hostname """ - distro = self._fetch_distro("debian") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = DataSourceNone.DataSourceNone({}, None, paths) - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) + cc = get_cloud(distro="debian", paths=paths, ds=DataSourceNone) # user-provided localhost should not be ignored util.write_file("/etc/hostname", "") @@ -219,132 +200,56 @@ def test_localhost_user_given_hostname(self, get_hostname): "cc_set_hostname", {"hostname": "localhost"}, cc, [] ) contents = util.load_text_file("/etc/hostname") - self.assertEqual("localhost", contents.strip()) + assert "localhost" == contents.strip() - def test_error_on_distro_set_hostname_errors(self): + def test_error_on_distro_set_hostname_errors(self, paths): """Raise SetHostnameError on exceptions from distro.set_hostname.""" - distro = self._fetch_distro("debian") - def set_hostname_error(hostname, fqdn): - raise RuntimeError("OOPS on: %s" % fqdn) + def set_hostname_error(hostname, fqdn=None) -> None: + raise RuntimeError(f"OOPS on: {fqdn}") - distro.set_hostname = set_hostname_error - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - with self.assertRaises(cc_set_hostname.SetHostnameError) as ctx_mgr: + cc = get_cloud(distro="debian", paths=paths) + cc.distro.set_hostname = set_hostname_error + with pytest.raises(cc_set_hostname.SetHostnameError) as exc_info: cc_set_hostname.handle( "somename", {"hostname": "hostname1.me.com"}, cc, [] ) - self.assertEqual( + assert ( "Failed to set the hostname to hostname1.me.com (hostname1):" - " OOPS on: hostname1.me.com", - str(ctx_mgr.exception), + " OOPS on: hostname1.me.com" == str(exc_info.value) ) - def test_ignore_empty_previous_artifact_file(self): + def test_ignore_empty_previous_artifact_file(self, paths): cfg = { "hostname": "blah", "fqdn": "blah.blah.blah.yahoo.com", } - distro = self._fetch_distro("debian") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) + cc = get_cloud(distro="debian", paths=paths) prev_fn = Path(cc.get_cpath("data")) / "set-hostname" + prev_fn.parent.mkdir() prev_fn.touch() cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) contents = util.load_text_file("/etc/hostname") - self.assertEqual("blah", contents.strip()) - - def test_create_hostname_file_false(self): - cfg = { - "hostname": "foo", - "fqdn": "foo.blah.yahoo.com", - "create_hostname_file": False, - } - distro = self._fetch_distro("debian") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - with self.assertRaises(FileNotFoundError): - util.load_text_file("/etc/hostname") - - def test_create_hostname_file_false_arch(self): - cfg = { - "hostname": "foo", - "fqdn": "foo.blah.yahoo.com", - "create_hostname_file": False, - } - distro = self._fetch_distro("arch") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - with self.assertRaises(FileNotFoundError): - util.load_text_file("/etc/hostname") - - def test_create_hostname_file_false_alpine(self): - cfg = { - "hostname": "foo", - "fqdn": "foo.blah.yahoo.com", - "create_hostname_file": False, - } - distro = self._fetch_distro("alpine") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - with self.assertRaises(FileNotFoundError): - util.load_text_file("/etc/hostname") - - def test_create_hostname_file_false_gentoo(self): - cfg = { - "hostname": "foo", - "fqdn": "foo.blah.yahoo.com", - "create_hostname_file": False, - } - distro = self._fetch_distro("gentoo") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - with self.assertRaises(FileNotFoundError): - util.load_text_file("/etc/hostname") - - def test_create_hostname_file_false_photon(self): - cfg = { - "hostname": "foo", - "fqdn": "foo.blah.yahoo.com", - "create_hostname_file": False, - } - distro = self._fetch_distro("photon") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - with self.assertRaises(FileNotFoundError): - util.load_text_file("/etc/hostname") - - def test_create_hostname_file_false_rhel(self): + assert "blah" == contents.strip() + + @pytest.mark.parametrize( + "distro_name", + ( + "debian", + "arch", + "alpine", + "gentoo", + "photon", + "rhel", + ), + ) + def test_create_hostname_file_false(self, distro_name, paths): cfg = { "hostname": "foo", "fqdn": "foo.blah.yahoo.com", "create_hostname_file": False, } - distro = self._fetch_distro("rhel") - paths = helpers.Paths({"cloud_dir": self.tmp}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) + cc = get_cloud(distro=distro_name, paths=paths) cc_set_hostname.handle("cc_set_hostname", cfg, cc, []) - with self.assertRaises(FileNotFoundError): + with pytest.raises(FileNotFoundError): util.load_text_file("/etc/hostname") diff --git a/tests/unittests/util.py b/tests/unittests/util.py index c779ae56076..02aa6b1ac97 100644 --- a/tests/unittests/util.py +++ b/tests/unittests/util.py @@ -1,14 +1,33 @@ # This file is part of cloud-init. See LICENSE file for license information. +from typing import Optional, Type from unittest import mock from cloudinit import cloud, distros, helpers +from cloudinit.config import Config from cloudinit.net.dhcp import IscDhclient from cloudinit.sources import DataSource, DataSourceHostname from cloudinit.sources.DataSourceNone import DataSourceNone +class DataSourceTesting(DataSourceNone): + def get_hostname(self, fqdn=False, resolve_ip=False, metadata_only=False): + return DataSourceHostname("hostname", False) + + def persist_instance_data(self): + return True + + @property + def cloud_name(self): + return "testing" + + def get_cloud( - distro=None, paths=None, sys_cfg=None, metadata=None, mocked_distro=False + distro: Optional[str] = None, + paths: Optional[helpers.Paths] = None, + sys_cfg: Optional[Config] = None, + metadata=None, + mocked_distro=False, + ds: Type[DataSource] = DataSourceTesting, ): """Obtain a "cloud" that can be used for testing. @@ -21,14 +40,18 @@ def get_cloud( """ paths = paths or helpers.Paths({}) sys_cfg = sys_cfg or {} - cls = distros.fetch(distro) if distro else MockDistro + if distro: + distro_cls: Type[distros.Distro] = distros.fetch(distro) + else: + distro = "testingdistro" + distro_cls = MockDistro # *BSD calls platform.system to determine osfamilies osfamily = distro.lower() if distro else "ubuntu" with mock.patch("platform.system", return_value=osfamily): - mydist = cls(distro, sys_cfg, paths) + mydist = distro_cls(distro, sys_cfg, paths) if mocked_distro: mydist = mock.MagicMock(wraps=mydist) - myds = DataSourceTesting(sys_cfg, mydist, paths) + myds = ds(sys_cfg, mydist, paths) if metadata: myds.metadata.update(metadata) if paths: @@ -48,18 +71,6 @@ class concreteCls(abclass): return type("DummyConcrete" + abclass.__name__, (concreteCls,), {}) -class DataSourceTesting(DataSourceNone): - def get_hostname(self, fqdn=False, resolve_ip=False, metadata_only=False): - return DataSourceHostname("hostname", False) - - def persist_instance_data(self): - return True - - @property - def cloud_name(self): - return "testing" - - class MockDistro(distros.Distro): # MockDistro is here to test base Distro class implementations def __init__(self, name="testingdistro", cfg=None, paths=None):