From eb212a7ba3948582221c858d99147d57816d74e1 Mon Sep 17 00:00:00 2001 From: skshetry <18718008+skshetry@users.noreply.github.com> Date: Wed, 12 Jul 2023 21:52:41 +0545 Subject: [PATCH] repro: introduce --keep-going/--ignore to continue on error (#9712) * repro: introduce --keep-going/--ignore to continue on error `-k`/`--keep-going` will skip all dependents of a stage if it fails. It'll still try to reproduce the other dependencies of the pending targets, before it exits with a non-zero status. `--ignore-errors` will treat any error during stage execution as a success, and will continue executing other dependencies as if it succeeded. Exits with 0 even on failure. Also fixes: * --force-downstream used to force everything even if one upstream nodes changed. This used to work when we supported a single targets, but there could be independent pipelines, so we should not force everything if any node changes. Now we'll force reproduce the stage only if one of its dependency changed upstream or it's upstream node was also forced. * Makes `--downstream` more correct, in relation to `--allow-missing` and `--pull`. Now we use an active graph for downstream/upstream logic instead of a subgraph. Note that this does not affect the downstream logic though, as dvc always goes downstream, so subgraph was no different than active graph. The subgraph only did not have the upstream nodes. * add tests * add spacing in the message --- dvc/commands/repro.py | 21 +++++ dvc/exceptions.py | 4 +- dvc/repo/reproduce.py | 139 +++++++++++++++++++++--------- tests/func/repro/test_repro.py | 62 +++++++++++++ tests/unit/command/test_repro.py | 1 + tests/unit/repo/test_reproduce.py | 105 +++++++++++----------- 6 files changed, 238 insertions(+), 94 deletions(-) diff --git a/dvc/commands/repro.py b/dvc/commands/repro.py index f5fb50fff9..6a448142b3 100644 --- a/dvc/commands/repro.py +++ b/dvc/commands/repro.py @@ -33,6 +33,7 @@ def _common_kwargs(self): "force_downstream": self.args.force_downstream, "pull": self.args.pull, "allow_missing": self.args.allow_missing, + "on_error": self.args.on_error, } @property @@ -137,6 +138,26 @@ def add_arguments(repro_parser): "Only print the commands that would be executed without actually executing." ), ) + repro_parser.add_argument( + "-k", + "--keep-going", + action="store_const", + default="fail", + const="keep-going", + dest="on_error", + help=( + "Continue executing, skipping stages having dependencies " + "on the failed stages" + ), + ) + repro_parser.add_argument( + "--ignore-errors", + action="store_const", + default="fail", + const="ignore", + dest="on_error", + help="Ignore errors from stages.", + ) def add_parser(subparsers, parent_parser): diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 1ca14ec483..618eeaef49 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -167,9 +167,7 @@ class InitError(DvcException): class ReproductionError(DvcException): - def __init__(self, name): - self.name = name - super().__init__(f"failed to reproduce '{name}'") + pass class BadMetricError(DvcException): diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 65ee8c7359..666b12e024 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -1,11 +1,25 @@ import logging -from typing import TYPE_CHECKING, Iterable, List, Optional, Union, cast +from typing import ( + TYPE_CHECKING, + Callable, + Dict, + Iterable, + List, + NoReturn, + Optional, + Set, + Tuple, + TypeVar, + Union, + cast, +) from funcy import ldistinct from dvc.exceptions import ReproductionError from dvc.repo.scm_context import scm_context from dvc.stage.cache import RunCacheNotSupported +from dvc.utils import humanize from dvc.utils.collections import ensure_list from . import locked @@ -18,6 +32,7 @@ from . import Repo logger = logging.getLogger(__name__) +T = TypeVar("T") def collect_stages( @@ -50,7 +65,7 @@ def get_subgraph( return nx.compose_all(used_pipelines) -def _remove_frozen_stages(graph: "DiGraph") -> "DiGraph": +def get_active_graph(graph: "DiGraph") -> "DiGraph": g = cast("DiGraph", graph.copy()) for stage in graph: if stage.frozen: @@ -59,23 +74,12 @@ def _remove_frozen_stages(graph: "DiGraph") -> "DiGraph": return g -def get_active_graph( - graph: "DiGraph", - stages: Optional[List["Stage"]] = None, - pipeline: bool = False, - downstream: bool = False, -) -> "DiGraph": - """Return the graph to operate.""" - processed = _remove_frozen_stages(graph) - return get_subgraph(processed, stages, pipeline=pipeline, downstream=downstream) - - def plan_repro( graph: "DiGraph", - stages: Optional[List["Stage"]] = None, + stages: Optional[List["T"]] = None, pipeline: bool = False, downstream: bool = False, -) -> List["Stage"]: +) -> List["T"]: r"""Derive the evaluation of the given node for the given graph. When you _reproduce a stage_, you want to _evaluate the descendants_ @@ -113,16 +117,14 @@ def plan_repro( """ import networkx as nx - active = get_active_graph(graph, stages, pipeline=pipeline, downstream=downstream) - return list(nx.dfs_postorder_nodes(active)) + sub = get_subgraph(graph, stages, pipeline=pipeline, downstream=downstream) + return list(nx.dfs_postorder_nodes(sub)) def _reproduce_stage(stage: "Stage", **kwargs) -> Optional["Stage"]: if stage.frozen and not stage.is_import: - logger.warning( - "%s is frozen. Its dependencies are not going to be reproduced.", - stage, - ) + msg = "%s is frozen. Its dependencies are not going to be reproduced." + logger.warning(msg, stage) ret = stage.reproduce(**kwargs) if ret and not kwargs.get("dry", False): @@ -130,31 +132,88 @@ def _reproduce_stage(stage: "Stage", **kwargs) -> Optional["Stage"]: return ret -def _reproduce_stages( +def _get_upstream_downstream_nodes( + graph: Optional["DiGraph"], node: T +) -> Tuple[List[T], List[T]]: + succ = list(graph.successors(node)) if graph else [] + pre = list(graph.predecessors(node)) if graph else [] + return succ, pre + + +def _repr(stages: Iterable["Stage"]) -> str: + return humanize.join(repr(stage.addressing) for stage in stages) + + +def handle_error( + graph: Optional["DiGraph"], on_error: str, exc: Exception, stage: "Stage" +) -> Set["Stage"]: + import networkx as nx + + logger.warning("%s%s", exc, " (ignored)" if on_error == "ignore" else "") + if not graph or on_error == "ignore": + return set() + + dependents = set(nx.dfs_postorder_nodes(graph.reverse(), stage)) - {stage} + if dependents: + names = _repr(dependents) + msg = "%s %s will be skipped due to this failure" + logger.warning(msg, "Stages" if len(dependents) > 1 else "Stage", names) + return dependents + + +def _raise_error(exc: Optional[Exception], *stages: "Stage") -> NoReturn: + names = _repr(stages) + segment = " stages:" if len(stages) > 1 else "" + raise ReproductionError(f"failed to reproduce{segment} {names}") from exc + + +def _reproduce( stages: List["Stage"], + graph: Optional["DiGraph"] = None, force_downstream: bool = False, + on_error: str = "fail", + force: bool = False, + repro_fn: Callable = _reproduce_stage, **kwargs, ) -> List["Stage"]: + assert on_error in ("fail", "keep-going", "ignore") + result: List["Stage"] = [] - for i, stage in enumerate(stages): - try: - ret = _reproduce_stage(stage, upstream=stages[:i], **kwargs) - except Exception as exc: # noqa: BLE001 - raise ReproductionError(stage.addressing) from exc + failed: List["Stage"] = [] + to_skip: Dict["Stage", "Stage"] = {} + ret: Optional["Stage"] = None + + force_state = {node: force for node in stages} - if not ret: + for stage in stages: + if stage in to_skip: continue - result.append(ret) - if force_downstream: - # NOTE: we are walking our pipeline from the top to the - # bottom. If one stage is changed, it will be reproduced, - # which tells us that we should force reproducing all of - # the other stages down below, even if their direct - # dependencies didn't change. - kwargs["force"] = True - if i < len(stages) - 1: + if ret: logger.info("") # add a newline + + upstream, downstream = _get_upstream_downstream_nodes(graph, stage) + force_stage = force_state[stage] + + try: + ret = repro_fn(stage, upstream=upstream, force=force_stage, **kwargs) + except Exception as exc: # noqa: BLE001, pylint: disable=broad-exception-caught + failed.append(stage) + if on_error == "fail": + _raise_error(exc, stage) + + dependents = handle_error(graph, on_error, exc, stage) + to_skip.update({node: stage for node in dependents}) + continue + + if force_downstream and (ret or force_stage): + force_state.update({node: True for node in downstream}) + + if ret: + result.append(ret) + + if on_error != "ignore" and failed: + _raise_error(None, *failed) return result @@ -169,6 +228,7 @@ def reproduce( downstream: bool = False, single_item: bool = False, glob: bool = False, + on_error: Optional[str] = "fail", **kwargs, ): from dvc.dvcfile import PROJECT_FILE @@ -192,8 +252,9 @@ def reproduce( except RunCacheNotSupported as e: logger.warning("Failed to pull run cache: %s", e) + graph = None steps = stages if not single_item: - graph = self.index.graph + graph = get_active_graph(self.index.graph) steps = plan_repro(graph, stages, pipeline=pipeline, downstream=downstream) - return _reproduce_stages(steps, **kwargs) + return _reproduce(steps, graph=graph, on_error=on_error or "fail", **kwargs) diff --git a/tests/func/repro/test_repro.py b/tests/func/repro/test_repro.py index a2b10cb0d5..64047dc786 100644 --- a/tests/func/repro/test_repro.py +++ b/tests/func/repro/test_repro.py @@ -943,6 +943,18 @@ def test_repro_force_downstream(tmp_dir, dvc, copy_script): assert stages[1].addressing == file3_stage.addressing +def test_repro_force_downstream_do_not_force_independent_stages(tmp_dir, dvc, run_copy): + tmp_dir.gen({"foo": "foo", "bar": "bar"}) + foo1 = run_copy("foo", "foo1", name="foo1") + foo2 = run_copy("foo1", "foo2", name="foo2") + run_copy("bar", "bar1", name="bar1") + run_copy("bar1", "bar2", name="bar2") + cat = dvc.run(cmd="cat bar2 foo2", deps=["foo2", "bar2"], name="cat") + + tmp_dir.gen("foo", "foobar") + assert dvc.reproduce(force_downstream=True) == [foo1, foo2, cat] + + def test_repro_pipeline( tmp_dir, dvc, @@ -1296,3 +1308,53 @@ def test_repro_single_item_with_multiple_targets(tmp_dir, dvc, copy_script): stage2, stage1, ] + + +def test_repro_keep_going(mocker, tmp_dir, dvc, copy_script): + from dvc.repo import reproduce + + (bar_stage, foo_stage) = tmp_dir.dvc_gen({"bar": "bar", "foo": "foo"}) + stage1 = dvc.stage.add( + cmd=["python copy.py bar foobar", "exit 1"], + deps=["bar"], + outs=["foobar"], + name="copy-bar-foobar", + ) + dvc.stage.add(cmd="cat foobar foo", deps=["foobar", "foo"], name="cat") + spy = mocker.spy(reproduce, "_reproduce_stage") + + with pytest.raises(ReproductionError): + dvc.reproduce(on_error="keep-going", repro_fn=spy) + + assert spy.call_args_list == [ + mocker.call(bar_stage, upstream=[], force=False, interactive=False), + mocker.call(stage1, upstream=[bar_stage], force=False, interactive=False), + mocker.call(foo_stage, upstream=[], force=False, interactive=False), + ] + + +def test_repro_ignore_errors(M, mocker, tmp_dir, dvc, copy_script): + from dvc.repo import reproduce + + (bar_stage, foo_stage) = tmp_dir.dvc_gen({"bar": "bar", "foo": "foo"}) + stage1 = dvc.stage.add( + cmd=["python copy.py bar foobar", "exit 1"], + deps=["bar"], + outs=["foobar"], + name="copy-bar-foobar", + ) + stage2 = dvc.stage.add(cmd="cat foobar foo", deps=["foobar", "foo"], name="cat") + spy = mocker.spy(reproduce, "_reproduce_stage") + dvc.reproduce(on_error="ignore", repro_fn=spy) + + assert spy.call_args_list == [ + mocker.call(bar_stage, upstream=[], force=False, interactive=False), + mocker.call(stage1, upstream=[bar_stage], force=False, interactive=False), + mocker.call(foo_stage, upstream=[], force=False, interactive=False), + mocker.call( + stage2, + upstream=[foo_stage, stage1], + force=False, + interactive=False, + ), + ] diff --git a/tests/unit/command/test_repro.py b/tests/unit/command/test_repro.py index 3474ce0c9b..7010ee466b 100644 --- a/tests/unit/command/test_repro.py +++ b/tests/unit/command/test_repro.py @@ -14,6 +14,7 @@ "pull": False, "allow_missing": False, "targets": [], + "on_error": "fail", } repro_arguments = { "run_cache": True, diff --git a/tests/unit/repo/test_reproduce.py b/tests/unit/repo/test_reproduce.py index a360284dcc..cbc9580d7b 100644 --- a/tests/unit/repo/test_reproduce.py +++ b/tests/unit/repo/test_reproduce.py @@ -1,70 +1,71 @@ from itertools import chain from networkx import DiGraph +from networkx.utils import graphs_equal -from dvc.repo.reproduce import plan_repro +from dvc.repo.reproduce import get_active_graph, plan_repro -def test_number_reproduces(tmp_dir, dvc, mocker): - reproduce_stage_mock = mocker.patch( - "dvc.repo.reproduce._reproduce_stage", returns=[] - ) - tmp_dir.dvc_gen({"pre-foo": "pre-foo"}) - - dvc.run(name="echo-foo", outs=["foo"], cmd="echo foo > foo") - dvc.run(name="echo-bar", deps=["foo"], outs=["bar"], cmd="echo bar > bar") - dvc.run(name="echo-baz", deps=["foo"], outs=["baz"], cmd="echo baz > baz") - dvc.run(name="echo-boop", deps=["bar"], outs=["boop"], cmd="echo boop > boop") +def test_active_graph(mocker): + n = mocker.sentinel + n1, n2, n3, n4, n5, n6, n7, n8, n9 = (getattr(n, f"n{i}") for i in range(1, 10)) + edges = {n1: [n2, n3], n2: [n4, n5], n3: [n6, n7], n8: [n9]} + for node in chain.from_iterable([n, *v] for n, v in edges.items()): + node.frozen = False - reproduce_stage_mock.reset_mock() + g = DiGraph(edges) - dvc.reproduce(all_pipelines=True) + active = get_active_graph(g) + assert graphs_equal(g, active) - assert reproduce_stage_mock.call_count == 5 + n2.frozen = True + active = get_active_graph(g) + assert g.edges() - active.edges() == {(n2, n5), (n2, n4)} + assert n2 in active + assert not active.edges() - g.edges() + assert not graphs_equal(g, active) -def test_repro_plan(mocker): +def test_repro_plan(M): r""" - n1 - / \ - n2 m3 n8 - / \ / \ | - n4 n5 n6 n7 n9 + 1 + / \ + 2 3 8 + / \ / \ | + 4 5 6 7 9 """ + g = DiGraph({1: [2, 3], 2: [4, 5], 3: [6, 7], 8: [9]}) - # note: downstream steps may not be stable, use AnyOf in such cases - class AnyOf: - def __init__(self, *items): - self.items = items + assert plan_repro(g) == [4, 5, 2, 6, 7, 3, 1, 9, 8] + assert plan_repro(g, [1]) == [4, 5, 2, 6, 7, 3, 1] + assert plan_repro(g, [4], downstream=True) == [4, 2, 1] + assert plan_repro(g, [8], True) == plan_repro(g, [9], True) == [9, 8] + assert plan_repro(g, [2, 8], True) == [4, 5, 2, 6, 7, 3, 1, 9, 8] + assert plan_repro(g, [2, 3], downstream=True) == [ + M.any_of(2, 3), + M.any_of(2, 3), + 1, + ] - def __eq__(self, other: object) -> bool: - return any(item == other for item in self.items) - n = mocker.sentinel - n1, n2, n3, n4, n5, n6, n7, n8, n9 = (getattr(n, f"n{i}") for i in range(1, 10)) - edges = {n1: [n2, n3], n2: [n4, n5], n3: [n6, n7], n8: [n9]} - for node in chain.from_iterable([n, *v] for n, v in edges.items()): - node.frozen = False +def test_number_reproduces(tmp_dir, dvc, mocker): + mock = mocker.Mock(return_value=None) + tmp_dir.dvc_gen({"pre-foo": "pre-foo"}) - g = DiGraph(edges) - assert plan_repro(g) == [n4, n5, n2, n6, n7, n3, n1, n9, n8] - assert plan_repro(g, [n1]) == [n4, n5, n2, n6, n7, n3, n1] - assert plan_repro(g, [n4], downstream=True) == [n4, n2, n1] - assert plan_repro(g, [n8], True) == plan_repro(g, [n9], True) == [n9, n8] - assert plan_repro(g, [n2, n8], True) == [n4, n5, n2, n6, n7, n3, n1, n9, n8] - assert plan_repro(g, [n2, n3], downstream=True) == [ - AnyOf(n2, n3), - AnyOf(n2, n3), - n1, - ] + dvc.stage.add(name="echo-foo", outs=["foo"], cmd="echo foo > foo", verify=False) + dvc.stage.add( + name="echo-bar", deps=["foo"], outs=["bar"], cmd="echo bar > bar", verify=False + ) + dvc.stage.add( + name="echo-baz", deps=["foo"], outs=["baz"], cmd="echo baz > baz", verify=False + ) + dvc.stage.add( + name="echo-boop", + deps=["bar"], + outs=["boop"], + cmd="echo boop > boop", + verify=False, + ) - n2.frozen = True - assert plan_repro(g) == [n2, n6, n7, n3, n1, n9, n8, n4, n5] - assert plan_repro(g, [n1]) == [n2, n6, n7, n3, n1] - assert plan_repro(g, [n4], downstream=True) == [n4] - assert plan_repro(g, [n2, n8], pipeline=True) == [n2, n6, n7, n3, n1, n9, n8] - assert plan_repro(g, [n2, n3], downstream=True) == [ - AnyOf(n2, n3), - AnyOf(n2, n3), - n1, - ] + dvc.reproduce(all_pipelines=True, repro_fn=mock) + assert mock.call_count == 5