From 53eb8555e091474803b724700815adc09aa84f05 Mon Sep 17 00:00:00 2001 From: Brett Holman Date: Wed, 3 Jan 2024 09:11:21 -0700 Subject: [PATCH] fix(dhcp): Guard against FileNotFoundError and NameError exceptions --- cloudinit/net/dhcp.py | 20 ++++++++++------ tests/unittests/net/test_dhcp.py | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py index 5441ade7670..ed5b47479f6 100644 --- a/cloudinit/net/dhcp.py +++ b/cloudinit/net/dhcp.py @@ -5,15 +5,15 @@ # This file is part of cloud-init. See LICENSE file for license information. import abc -import contextlib import glob import logging import os import re import signal import time +from contextlib import suppress from io import StringIO -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import configobj @@ -268,7 +268,7 @@ def dhcp_discovery( # this function waits for these files to exist, clean previous runs # to avoid false positive in wait_for_files - with contextlib.suppress(FileNotFoundError): + with suppress(FileNotFoundError): os.remove(pid_file) os.remove(lease_file) @@ -514,9 +514,15 @@ def get_latest_lease(lease_d=None): return latest_file @staticmethod - def parse_dhcp_server_from_lease_file(lease_file): - with open(lease_file, "r") as fd: - for line in fd: + def parse_dhcp_server_from_lease_file(lease_file) -> Optional[str]: + """Parse a lease file for the dhcp server address + + @param lease_file: Name of a file to be parsed + @return: An address if found, or None + """ + latest_address = None + with suppress(FileNotFoundError), open(lease_file, "r") as file: + for line in file: if "dhcp-server-identifier" in line: words = line.strip(" ;\r\n").split(" ") if len(words) > 2: @@ -561,7 +567,7 @@ def dhcp_discovery( tmp_dir = temp_utils.get_tmp_ancestor(needs_exe=True) lease_file = os.path.join(tmp_dir, interface + ".lease.json") - with contextlib.suppress(FileNotFoundError): + with suppress(FileNotFoundError): os.remove(lease_file) # udhcpc needs the interface up to send initial discovery packets diff --git a/tests/unittests/net/test_dhcp.py b/tests/unittests/net/test_dhcp.py index a7b62312757..8ec96eef0fc 100644 --- a/tests/unittests/net/test_dhcp.py +++ b/tests/unittests/net/test_dhcp.py @@ -32,6 +32,46 @@ DHCLIENT = "/sbin/dhclient" +@pytest.mark.parametrize( + "server_address,lease_file_content", + ( + pytest.param(None, None, id="no_server_addr_on_absent_lease_file"), + pytest.param(None, "", id="no_server_addr_on_empty_lease_file"), + pytest.param( + None, + "lease {\n fixed-address: 10.1.2.3;\n}\n", + id="no_server_addr_when_no_server_ident", + ), + pytest.param( + "10.4.5.6", + "lease {\n fixed-address: 10.1.2.3;\n" + " option dhcp-server-identifier 10.4.5.6;\n" + " option dhcp-renewal-time 1800;\n}\n", + id="server_addr_found_when_server_ident_present", + ), + ), +) +class TestParseDHCPServerFromLeaseFile: + def test_find_server_address_when_present( + self, server_address, lease_file_content, tmp_path + ): + """Test that we return None in the case of no file or file contains no + server address, otherwise return the address. + """ + lease_file = tmp_path / "dhcp.leases" + if server_address: + if lease_file_content: + lease_file.write_text(lease_file_content) + assert ( + server_address + == IscDhclient.parse_dhcp_server_from_lease_file(lease_file) + ) + else: + assert not IscDhclient.parse_dhcp_server_from_lease_file( + lease_file + ) + + class TestParseDHCPLeasesFile(CiTestCase): def test_parse_empty_lease_file_errors(self): """parse_dhcp_lease_file errors when file content is empty."""