From ac57534faaac2ed7962a28fbb1474e1f487c645b Mon Sep 17 00:00:00 2001 From: Matt Davis Date: Sat, 19 Oct 2024 19:45:51 -0400 Subject: [PATCH] Refactor do_check routine to be more modular and address concerns about the quiet flag yielding too much output --- pipenv/routines/check.py | 318 ++++++++++++++++++++------------------- 1 file changed, 162 insertions(+), 156 deletions(-) diff --git a/pipenv/routines/check.py b/pipenv/routines/check.py index 4f2ab27fc1..c6da10b1ef 100644 --- a/pipenv/routines/check.py +++ b/pipenv/routines/check.py @@ -1,19 +1,21 @@ import io -import json as simplejson +import json import logging import os import sys import tempfile +from contextlib import redirect_stderr, redirect_stdout from pathlib import Path -from pipenv import exceptions, pep508checker +from pipenv import pep508checker +from pipenv.patched.safety.cli import cli from pipenv.utils.processes import run_command from pipenv.utils.project import ensure_project -from pipenv.utils.shell import cmd_list_to_shell, project_python +from pipenv.utils.shell import project_python from pipenv.vendor import click, plette -def build_options( +def build_safety_options( audit_and_monitor=True, exit_code=True, output="screen", @@ -47,6 +49,133 @@ def build_options( return options +def run_pep508_check(project, system, python): + pep508checker_path = pep508checker.__file__.rstrip("cdo") + cmd = [project_python(project, system=system), Path(pep508checker_path).as_posix()] + c = run_command(cmd, is_verbose=project.s.is_verbose()) + + if c.returncode is not None: + try: + return json.loads(c.stdout.strip()) + except json.JSONDecodeError: + click.echo( + f"Failed parsing pep508 results:\n{c.stdout.strip()}\n{c.stderr.strip()}", + err=True, + ) + sys.exit(1) + return {} + + +def check_pep508_requirements(project, results, quiet): + p = plette.Pipfile.load(open(project.pipfile_location)) + p = plette.Lockfile.with_meta_from(p) + failed = False + + for marker, specifier in p._data["_meta"]["requires"].items(): + if marker in results: + if results[marker] != specifier: + failed = True + click.echo( + "Specifier {} does not match {} ({})." + "".format( + click.style(marker, fg="green"), + click.style(specifier, fg="cyan"), + click.style(results[marker], fg="yellow"), + ), + err=True, + ) + + if failed: + click.secho("Failed!", fg="red", err=True) + sys.exit(1) + elif not quiet and not project.s.is_quiet(): + click.secho("Passed!", fg="green") + + +def get_requirements(project, use_installed, categories): + _cmd = [project_python(project, system=False)] + if use_installed: + return run_command( + _cmd + ["-m", "pip", "list", "--format=freeze"], + is_verbose=project.s.is_verbose(), + ) + elif categories: + return run_command( + ["pipenv", "requirements", "--categories", categories], + is_verbose=project.s.is_verbose(), + ) + else: + return run_command(["pipenv", "requirements"], is_verbose=project.s.is_verbose()) + + +def create_temp_requirements(project, requirements): + temp_requirements = tempfile.NamedTemporaryFile( + mode="w+", + prefix=f"{project.virtualenv_name}", + suffix="_requirements.txt", + delete=False, + ) + temp_requirements.write(requirements.stdout.strip()) + temp_requirements.close() + return temp_requirements + + +def run_safety_check(cmd, quiet): + sys.argv = cmd[1:] + + if quiet: + out = io.StringIO() + err = io.StringIO() + exit_code = 0 + with redirect_stdout(out), redirect_stderr(err): + try: + cli(prog_name="pipenv") + except SystemExit as exit_signal: + exit_code = exit_signal.code + return out.getvalue(), err.getvalue(), exit_code + else: + cli(prog_name="pipenv") + + +def parse_safety_output(output, quiet): + try: + json_report = json.loads(output) + meta = json_report.get("report_meta", {}) + vulnerabilities_found = meta.get("vulnerabilities_found", 0) + db_type = "commercial" if meta.get("api_key", False) else "free" + + if quiet: + click.secho( + f"{vulnerabilities_found} vulnerabilities found.", + fg="red" if vulnerabilities_found else "green", + ) + else: + fg = "red" if vulnerabilities_found else "green" + message = f"Scan complete using Safety's {db_type} vulnerability database." + click.echo() + click.secho(f"{vulnerabilities_found} vulnerabilities found.", fg=fg) + click.echo() + + for vuln in json_report.get("vulnerabilities", []): + click.echo( + "{}: {} {} open to vulnerability {} ({}). More info: {}".format( + click.style(vuln["vulnerability_id"], bold=True, fg="red"), + click.style(vuln["package_name"], fg="green"), + click.style(vuln["analyzed_version"], fg="yellow", bold=True), + click.style(vuln["vulnerability_id"], bold=True), + click.style(vuln["vulnerable_spec"], fg="yellow", bold=False), + click.style(vuln["more_info_url"], bold=True), + ) + ) + click.echo(f"{vuln['advisory']}") + click.echo() + + click.secho(message, fg="white", bold=True) + + except json.JSONDecodeError: + click.echo("Failed to parse Safety output.") + + def do_check( project, python=False, @@ -66,13 +195,10 @@ def do_check( use_installed=False, categories="", ): - import json - if not verbose: - logging.getLogger("pipenv").setLevel(logging.WARN) + logging.getLogger("pipenv").setLevel(logging.ERROR if quiet else logging.WARN) if not system: - # Ensure that virtualenv is available. ensure_project( project, python=python, @@ -80,62 +206,16 @@ def do_check( warn=False, pypi_mirror=pypi_mirror, ) + if not quiet and not project.s.is_quiet(): click.secho("Checking PEP 508 requirements...", bold=True) - pep508checker_path = pep508checker.__file__.rstrip("cdo") - safety_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "patched", "safety" - ) - _cmd = [project_python(project, system=system)] - # Run the PEP 508 checker in the virtualenv. - cmd = _cmd + [Path(pep508checker_path).as_posix()] - c = run_command(cmd, is_verbose=project.s.is_verbose()) - results = [] - if c.returncode is not None: - try: - results = simplejson.loads(c.stdout.strip()) - except json.JSONDecodeError: - click.echo( - "{}\n{}\n{}".format( - click.style( - "Failed parsing pep508 results: ", - fg="white", - bold=True, - ), - c.stdout.strip(), - c.stderr.strip(), - ) - ) - sys.exit(1) - # Load the pipfile. - p = plette.Pipfile.load(open(project.pipfile_location)) - p = plette.Lockfile.with_meta_from(p) - failed = False - # Assert each specified requirement. - for marker, specifier in p._data["_meta"]["requires"].items(): - if marker in results: - try: - assert results[marker] == specifier - except AssertionError: - failed = True - click.echo( - "Specifier {} does not match {} ({})." - "".format( - click.style(marker, fg="green"), - click.style(specifier, fg="cyan"), - click.style(results[marker], fg="yellow"), - ), - err=True, - ) - if failed: - click.secho("Failed!", fg="red", err=True) - sys.exit(1) - else: - if not quiet and not project.s.is_quiet(): - click.secho("Passed!", fg="green") - # Check for lockfile exists + + results = run_pep508_check(project, system, python) + check_pep508_requirements(project, results, quiet) + if not project.lockfile_exists: return + if not quiet and not project.s.is_quiet(): if use_installed: click.secho( @@ -147,46 +227,22 @@ def do_check( "Checking Pipfile.lock packages for vulnerabilities...", bold=True, ) + if ignore: - if not isinstance(ignore, (tuple, list)): - ignore = [ignore] - ignored = [["--ignore", cve] for cve in ignore] + ignore = [ignore] if not isinstance(ignore, (tuple, list)) else ignore if not quiet and not project.s.is_quiet(): click.echo( "Notice: Ignoring Vulnerabilit{} {}".format( - "ies" if len(ignored) > 1 else "y", + "ies" if len(ignore) > 1 else "y", click.style(", ".join(ignore), fg="yellow"), ), err=True, ) - else: - ignored = [] - - if use_installed: - target_venv_packages = run_command( - _cmd + ["-m", "pip", "list", "--format=freeze"], - is_verbose=project.s.is_verbose(), - ) - elif categories: - target_venv_packages = run_command( - ["pipenv", "requirements", "--categories", categories], - is_verbose=project.s.is_verbose(), - ) - else: - target_venv_packages = run_command( - ["pipenv", "requirements"], is_verbose=project.s.is_verbose() - ) - temp_requirements = tempfile.NamedTemporaryFile( - mode="w+", - prefix=f"{project.virtualenv_name}", - suffix="_requirements.txt", - delete=False, - ) - temp_requirements.write(target_venv_packages.stdout.strip()) - temp_requirements.close() + requirements = get_requirements(project, use_installed, categories) + temp_requirements = create_temp_requirements(project, requirements) - options = build_options( + options = build_safety_options( audit_and_monitor=audit_and_monitor, exit_code=exit_code, output=output, @@ -196,14 +252,17 @@ def do_check( temp_requirements_name=temp_requirements.name, ) - cmd = _cmd + [safety_path, "check"] + options + safety_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "patched", "safety" + ) + cmd = [project_python(project, system=system), safety_path, "check"] + options if db: if not quiet and not project.s.is_quiet(): click.echo(f"Using {db} database") cmd.append(f"--db={db}") elif key or project.s.PIPENV_PYUP_API_KEY: - cmd = cmd + [f"--key={key or project.s.PIPENV_PYUP_API_KEY}"] + cmd.append(f"--key={key or project.s.PIPENV_PYUP_API_KEY}") else: PIPENV_SAFETY_DB = ( "https://d2qjmgddvqvu75.cloudfront.net/aws/safety/pipenv/1.0.0/" @@ -211,74 +270,21 @@ def do_check( os.environ["SAFETY_ANNOUNCEMENTS_URL"] = f"{PIPENV_SAFETY_DB}announcements.json" cmd.append(f"--db={PIPENV_SAFETY_DB}") - if ignored: - for cve in ignored: - cmd += cve + if ignore: + for cve in ignore: + cmd.extend(["--ignore", cve]) os.environ["SAFETY_CUSTOM_INTEGRATION"] = "True" os.environ["SAFETY_SOURCE"] = "pipenv" os.environ["SAFETY_PURE_YAML"] = "True" - from pipenv.patched.safety.cli import cli + output, error, exit_code = run_safety_check(cmd, quiet) - sys.argv = cmd[1:] - - if output == "minimal": - from contextlib import redirect_stderr, redirect_stdout - - code = 0 - - with redirect_stdout(io.StringIO()) as out, redirect_stderr(io.StringIO()) as err: - try: - cli(prog_name="pipenv") - except SystemExit as exit_signal: - code = exit_signal.code - - report = out.getvalue() - error = err.getvalue() - - try: - json_report = simplejson.loads(report) - except Exception: - raise exceptions.PipenvCmdError( - cmd_list_to_shell(cmd), report, error, exit_code=code - ) - meta = json_report.get("report_meta") - vulnerabilities_found = meta.get("vulnerabilities_found") - - fg = "green" - message = "All good!" - db_type = "commercial" if meta.get("api_key", False) else "free" - - if vulnerabilities_found >= 0: - fg = "red" - message = ( - f"Scan was complete using Safety’s {db_type} vulnerability database." - ) - - click.echo() - click.secho(f"{vulnerabilities_found} vulnerabilities found.", fg=fg) - click.echo() - - vulnerabilities = json_report.get("vulnerabilities", []) - - for vuln in vulnerabilities: - click.echo( - "{}: {} {} open to vulnerability {} ({}). More info: {}".format( - click.style(vuln["vulnerability_id"], bold=True, fg="red"), - click.style(vuln["package_name"], fg="green"), - click.style(vuln["analyzed_version"], fg="yellow", bold=True), - click.style(vuln["vulnerability_id"], bold=True), - click.style(vuln["vulnerable_spec"], fg="yellow", bold=False), - click.style(vuln["more_info_url"], bold=True), - ) - ) - click.echo(f"{vuln['advisory']}") - click.echo() - - click.secho(message, fg="white", bold=True) - sys.exit(code) - - cli(prog_name="pipenv") + if quiet: + parse_safety_output(output, quiet) + else: + sys.stdout.write(output) + sys.stderr.write(error) - temp_requirements.remove() + temp_requirements.unlink() + sys.exit(exit_code)