Skip to content

Commit

Permalink
Fix typing issues in subp module (#4401)
Browse files Browse the repository at this point in the history
Add the 'SubpResult' return type from subp and fix some associated
typing issues

Additionally convert to using f-strings, use if expressions rather
than if blocks where applicable, and enforce subp kwargs
  • Loading branch information
TheRealFalcon authored Sep 7, 2023
1 parent fd214a1 commit 436e671
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cloudinit/config/cc_ca_certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def disable_default_ca_certs(distro_name, distro_cfg):
debconf_sel = (
"ca-certificates ca-certificates/trust_new_crts " + "select no"
)
subp.subp(("debconf-set-selections", "-"), debconf_sel)
subp.subp(("debconf-set-selections", "-"), data=debconf_sel)


def disable_system_ca_certs(distro_cfg):
Expand Down
2 changes: 1 addition & 1 deletion cloudinit/config/cc_grub_dpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
LOG.debug("Setting grub debconf-set-selections with '%s'", dconf_sel)

try:
subp.subp(["debconf-set-selections"], dconf_sel)
subp.subp(["debconf-set-selections"], data=dconf_sel)
except Exception as e:
util.logexc(
LOG, "Failed to run debconf-set-selections for grub_dpkg: %s", e
Expand Down
3 changes: 1 addition & 2 deletions cloudinit/config/cc_lxd.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
return
# Set up lxd if init config is given
if init_cfg:

# type is known, number of elements is not
# in the case of the ubuntu+lvm backend workaround
init_keys: Tuple[str, ...] = (
Expand Down Expand Up @@ -305,7 +304,7 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
)
+ "\n"
)
subp.subp(["debconf-communicate"], data)
subp.subp(["debconf-communicate"], data=data)
except Exception:
util.logexc(
LOG, "Failed to run '%s' for lxd with" % dconf_comm
Expand Down
6 changes: 4 additions & 2 deletions cloudinit/distros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,9 @@ def set_passwd(self, user, passwd, hashed=False):
cmd.append("-e")

try:
subp.subp(cmd, pass_string, logstring="chpasswd for %s" % user)
subp.subp(
cmd, data=pass_string, logstring="chpasswd for %s" % user
)
except Exception as e:
util.logexc(LOG, "Failed to set password for %s", user)
raise e
Expand All @@ -796,7 +798,7 @@ def chpasswd(self, plist_in: list, hashed: bool):
+ "\n"
)
cmd = ["chpasswd"] + (["-e"] if hashed else [])
subp.subp(cmd, payload)
subp.subp(cmd, data=payload)

def is_doas_rule_valid(self, user, rule):
rule_pattern = (
Expand Down
94 changes: 39 additions & 55 deletions cloudinit/subp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import os
import subprocess
from errno import ENOEXEC
from io import TextIOWrapper
from typing import List, Union

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,7 +41,7 @@ def prepend_base_command(base_command, commands):
elif command[0] != base_command: # Automatically prepend
command.insert(0, base_command)
elif isinstance(command, str):
if not command.startswith("%s " % base_command):
if not command.startswith(f"{base_command} "):
warnings.append(command)
else:
errors.append(str(command))
Expand All @@ -64,7 +66,6 @@ def prepend_base_command(base_command, commands):


class ProcessExecutionError(IOError):

MESSAGE_TMPL = (
"%(description)s\n"
"Command: %(cmd)s\n"
Expand All @@ -85,46 +86,33 @@ def __init__(
reason=None,
errno=None,
):
if not cmd:
self.cmd = self.empty_attr
else:
self.cmd = cmd
self.cmd = cmd or self.empty_attr

if not description:
if not exit_code and errno == ENOEXEC:
self.description = "Exec format error. Missing #! in script?"
else:
self.description = "Unexpected error while running command."
else:
if description:
self.description = description

if not isinstance(exit_code, int):
self.exit_code = self.empty_attr
elif not exit_code and errno == ENOEXEC:
self.description = "Exec format error. Missing #! in script?"
else:
self.exit_code = exit_code
self.description = "Unexpected error while running command."

self.exit_code = (
exit_code if isinstance(exit_code, int) else self.empty_attr
)

if not stderr:
if stderr is None:
self.stderr = self.empty_attr
else:
self.stderr = stderr
self.stderr = self.empty_attr if stderr is None else stderr
else:
self.stderr = self._indent_text(stderr)

if not stdout:
if stdout is None:
self.stdout = self.empty_attr
else:
self.stdout = stdout
self.stdout = self.empty_attr if stdout is None else stdout
else:
self.stdout = self._indent_text(stdout)

if reason:
self.reason = reason
else:
self.reason = self.empty_attr
self.reason = reason or self.empty_attr

self.errno = errno
if errno:
self.errno = errno
message = self.MESSAGE_TMPL % {
"description": self._ensure_string(self.description),
"cmd": self._ensure_string(self.cmd),
Expand All @@ -141,23 +129,23 @@ def _ensure_string(self, text):
"""
return text.decode() if isinstance(text, bytes) else text

def _indent_text(self, text, indent_level=8):
def _indent_text(
self, text: Union[str, bytes], indent_level=8
) -> Union[str, bytes]:
"""
indent text on all but the first line, allowing for easy to read output
remove any newlines at end of text first to prevent unneeded blank
line in output
"""
cr = "\n"
indent = " " * indent_level
# if input is bytes, return bytes
if isinstance(text, bytes):
cr = cr.encode()
indent = indent.encode()
# remove any newlines at end of text first to prevent unneeded blank
# line in output
return text.rstrip(cr).replace(cr, cr + indent)
if not isinstance(text, bytes):
return text.rstrip("\n").replace("\n", "\n" + " " * indent_level)
return text.rstrip(b"\n").replace(b"\n", b"\n" + b" " * indent_level)


def subp(
args,
*,
data=None,
rcs=None,
env=None,
Expand All @@ -170,7 +158,7 @@ def subp(
update_env=None,
status_cb=None,
cwd=None,
):
) -> SubpResult:
"""Run a subprocess.
:param args: command to run in a list. [cmd, arg1, arg2...]
Expand Down Expand Up @@ -257,7 +245,7 @@ def subp(
logstring,
)

stdin = None
stdin: Union[TextIOWrapper, int]
stdout = None
stderr = None
if capture:
Expand All @@ -279,6 +267,7 @@ def subp(
# Popen converts entries in the arguments array from non-bytes to bytes.
# When locale is unset it may use ascii for that encoding which can
# cause UnicodeDecodeErrors. (LP: #1751051)
bytes_args: Union[bytes, List[bytes]]
if isinstance(args, bytes):
bytes_args = args
elif isinstance(args, str):
Expand Down Expand Up @@ -321,9 +310,7 @@ def subp(
if decode:

def ldecode(data, m="utf-8"):
if not isinstance(data, bytes):
return data
return data.decode(m, decode)
return data.decode(m, decode) if isinstance(data, bytes) else data

out = ldecode(out)
err = ldecode(err)
Expand All @@ -345,7 +332,7 @@ def target_path(target, path=None):
if target in (None, ""):
target = "/"
elif not isinstance(target, str):
raise ValueError("Unexpected input for target: %s" % target)
raise ValueError(f"Unexpected input for target: {target}")
else:
target = os.path.abspath(target)
# abspath("//") returns "//" specifically for 2 slashes.
Expand All @@ -365,22 +352,19 @@ def target_path(target, path=None):
def which(program, search=None, target=None):
target = target_path(target)

if os.path.sep in program:
if os.path.sep in program and is_exe(target_path(target, program)):
# if program had a '/' in it, then do not search PATH
# 'which' does consider cwd here. (cd / && which bin/ls) = bin/ls
# so effectively we set cwd to / (or target)
if is_exe(target_path(target, program)):
return program
return program

if search is None:
paths = [
p.strip('"') for p in os.environ.get("PATH", "").split(os.pathsep)
]
if target == "/":
search = paths
else:
search = [p for p in paths if p.startswith("/")]

search = (
paths if target == "/" else [p for p in paths if p.startswith("/")]
)
# normalize path input
search = [os.path.abspath(p) for p in search]

Expand Down Expand Up @@ -425,8 +409,8 @@ def runparts(dirp, skip_no_exist=True, exe_prefix=None):

if failed and attempted:
raise RuntimeError(
"Runparts: %s failures (%s) in %s attempted commands"
% (len(failed), ",".join(failed), len(attempted))
f'Runparts: {len(failed)} failures ({",".join(failed)}) in '
f"{len(attempted)} attempted commands"
)


Expand Down
6 changes: 4 additions & 2 deletions tests/unittests/config/test_cc_ca_certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,10 @@ def test_commands(self):
if distro_name in ["debian", "ubuntu"]:
mock_subp.assert_called_once_with(
("debconf-set-selections", "-"),
"ca-certificates ca-certificates/trust_new_crts"
" select no",
data=(
"ca-certificates ca-certificates/"
"trust_new_crts select no"
),
)
else:
assert mock_subp.call_count == 0
Expand Down

0 comments on commit 436e671

Please sign in to comment.