Skip to content

Commit

Permalink
repro: introduce --keep-going/--ignore to continue on error (#9712)
Browse files Browse the repository at this point in the history
* repro: introduce --keep-going/--ignore to continue on error

`-k`/`--keep-going` will skip all dependents of a stage if it fails.
It'll still try to reproduce the other dependencies of the pending
targets, before it exits with a non-zero status.

`--ignore-errors` will treat any error during stage execution as a
success, and will continue executing other dependencies as if it
succeeded. Exits with 0 even on failure.

Also fixes:
* --force-downstream used to force everything even if one upstream nodes changed.
  This used to work when we supported a single targets, but there could be independent
  pipelines, so  we should not force everything if any node changes.
  Now we'll force reproduce the stage only if one of its dependency changed upstream
  or it's upstream node was also forced.

* Makes `--downstream` more correct, in relation to `--allow-missing` and `--pull`.
  Now we use an active graph for downstream/upstream logic instead of a subgraph.
  Note that this does not affect the downstream logic though, as dvc always goes
  downstream, so subgraph was no different than active graph.
  The subgraph only did not have the upstream nodes.

* add tests

* add spacing in the message
  • Loading branch information
skshetry authored Jul 12, 2023
1 parent 4c8f539 commit eb212a7
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 94 deletions.
21 changes: 21 additions & 0 deletions dvc/commands/repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def _common_kwargs(self):
"force_downstream": self.args.force_downstream,
"pull": self.args.pull,
"allow_missing": self.args.allow_missing,
"on_error": self.args.on_error,
}

@property
Expand Down Expand Up @@ -137,6 +138,26 @@ def add_arguments(repro_parser):
"Only print the commands that would be executed without actually executing."
),
)
repro_parser.add_argument(
"-k",
"--keep-going",
action="store_const",
default="fail",
const="keep-going",
dest="on_error",
help=(
"Continue executing, skipping stages having dependencies "
"on the failed stages"
),
)
repro_parser.add_argument(
"--ignore-errors",
action="store_const",
default="fail",
const="ignore",
dest="on_error",
help="Ignore errors from stages.",
)


def add_parser(subparsers, parent_parser):
Expand Down
4 changes: 1 addition & 3 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ class InitError(DvcException):


class ReproductionError(DvcException):
def __init__(self, name):
self.name = name
super().__init__(f"failed to reproduce '{name}'")
pass


class BadMetricError(DvcException):
Expand Down
139 changes: 100 additions & 39 deletions dvc/repo/reproduce.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import logging
from typing import TYPE_CHECKING, Iterable, List, Optional, Union, cast
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
NoReturn,
Optional,
Set,
Tuple,
TypeVar,
Union,
cast,
)

from funcy import ldistinct

from dvc.exceptions import ReproductionError
from dvc.repo.scm_context import scm_context
from dvc.stage.cache import RunCacheNotSupported
from dvc.utils import humanize
from dvc.utils.collections import ensure_list

from . import locked
Expand All @@ -18,6 +32,7 @@
from . import Repo

logger = logging.getLogger(__name__)
T = TypeVar("T")


def collect_stages(
Expand Down Expand Up @@ -50,7 +65,7 @@ def get_subgraph(
return nx.compose_all(used_pipelines)


def _remove_frozen_stages(graph: "DiGraph") -> "DiGraph":
def get_active_graph(graph: "DiGraph") -> "DiGraph":
g = cast("DiGraph", graph.copy())
for stage in graph:
if stage.frozen:
Expand All @@ -59,23 +74,12 @@ def _remove_frozen_stages(graph: "DiGraph") -> "DiGraph":
return g


def get_active_graph(
graph: "DiGraph",
stages: Optional[List["Stage"]] = None,
pipeline: bool = False,
downstream: bool = False,
) -> "DiGraph":
"""Return the graph to operate."""
processed = _remove_frozen_stages(graph)
return get_subgraph(processed, stages, pipeline=pipeline, downstream=downstream)


def plan_repro(
graph: "DiGraph",
stages: Optional[List["Stage"]] = None,
stages: Optional[List["T"]] = None,
pipeline: bool = False,
downstream: bool = False,
) -> List["Stage"]:
) -> List["T"]:
r"""Derive the evaluation of the given node for the given graph.
When you _reproduce a stage_, you want to _evaluate the descendants_
Expand Down Expand Up @@ -113,48 +117,103 @@ def plan_repro(
"""
import networkx as nx

active = get_active_graph(graph, stages, pipeline=pipeline, downstream=downstream)
return list(nx.dfs_postorder_nodes(active))
sub = get_subgraph(graph, stages, pipeline=pipeline, downstream=downstream)
return list(nx.dfs_postorder_nodes(sub))


def _reproduce_stage(stage: "Stage", **kwargs) -> Optional["Stage"]:
if stage.frozen and not stage.is_import:
logger.warning(
"%s is frozen. Its dependencies are not going to be reproduced.",
stage,
)
msg = "%s is frozen. Its dependencies are not going to be reproduced."
logger.warning(msg, stage)

ret = stage.reproduce(**kwargs)
if ret and not kwargs.get("dry", False):
stage.dump(update_pipeline=False)
return ret


def _reproduce_stages(
def _get_upstream_downstream_nodes(
graph: Optional["DiGraph"], node: T
) -> Tuple[List[T], List[T]]:
succ = list(graph.successors(node)) if graph else []
pre = list(graph.predecessors(node)) if graph else []
return succ, pre


def _repr(stages: Iterable["Stage"]) -> str:
return humanize.join(repr(stage.addressing) for stage in stages)


def handle_error(
graph: Optional["DiGraph"], on_error: str, exc: Exception, stage: "Stage"
) -> Set["Stage"]:
import networkx as nx

logger.warning("%s%s", exc, " (ignored)" if on_error == "ignore" else "")
if not graph or on_error == "ignore":
return set()

dependents = set(nx.dfs_postorder_nodes(graph.reverse(), stage)) - {stage}
if dependents:
names = _repr(dependents)
msg = "%s %s will be skipped due to this failure"
logger.warning(msg, "Stages" if len(dependents) > 1 else "Stage", names)
return dependents


def _raise_error(exc: Optional[Exception], *stages: "Stage") -> NoReturn:
names = _repr(stages)
segment = " stages:" if len(stages) > 1 else ""
raise ReproductionError(f"failed to reproduce{segment} {names}") from exc


def _reproduce(
stages: List["Stage"],
graph: Optional["DiGraph"] = None,
force_downstream: bool = False,
on_error: str = "fail",
force: bool = False,
repro_fn: Callable = _reproduce_stage,
**kwargs,
) -> List["Stage"]:
assert on_error in ("fail", "keep-going", "ignore")

result: List["Stage"] = []
for i, stage in enumerate(stages):
try:
ret = _reproduce_stage(stage, upstream=stages[:i], **kwargs)
except Exception as exc: # noqa: BLE001
raise ReproductionError(stage.addressing) from exc
failed: List["Stage"] = []
to_skip: Dict["Stage", "Stage"] = {}
ret: Optional["Stage"] = None

force_state = {node: force for node in stages}

if not ret:
for stage in stages:
if stage in to_skip:
continue

result.append(ret)
if force_downstream:
# NOTE: we are walking our pipeline from the top to the
# bottom. If one stage is changed, it will be reproduced,
# which tells us that we should force reproducing all of
# the other stages down below, even if their direct
# dependencies didn't change.
kwargs["force"] = True
if i < len(stages) - 1:
if ret:
logger.info("") # add a newline

upstream, downstream = _get_upstream_downstream_nodes(graph, stage)
force_stage = force_state[stage]

try:
ret = repro_fn(stage, upstream=upstream, force=force_stage, **kwargs)
except Exception as exc: # noqa: BLE001, pylint: disable=broad-exception-caught
failed.append(stage)
if on_error == "fail":
_raise_error(exc, stage)

dependents = handle_error(graph, on_error, exc, stage)
to_skip.update({node: stage for node in dependents})
continue

if force_downstream and (ret or force_stage):
force_state.update({node: True for node in downstream})

if ret:
result.append(ret)

if on_error != "ignore" and failed:
_raise_error(None, *failed)
return result


Expand All @@ -169,6 +228,7 @@ def reproduce(
downstream: bool = False,
single_item: bool = False,
glob: bool = False,
on_error: Optional[str] = "fail",
**kwargs,
):
from dvc.dvcfile import PROJECT_FILE
Expand All @@ -192,8 +252,9 @@ def reproduce(
except RunCacheNotSupported as e:
logger.warning("Failed to pull run cache: %s", e)

graph = None
steps = stages
if not single_item:
graph = self.index.graph
graph = get_active_graph(self.index.graph)
steps = plan_repro(graph, stages, pipeline=pipeline, downstream=downstream)
return _reproduce_stages(steps, **kwargs)
return _reproduce(steps, graph=graph, on_error=on_error or "fail", **kwargs)
62 changes: 62 additions & 0 deletions tests/func/repro/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,18 @@ def test_repro_force_downstream(tmp_dir, dvc, copy_script):
assert stages[1].addressing == file3_stage.addressing


def test_repro_force_downstream_do_not_force_independent_stages(tmp_dir, dvc, run_copy):
tmp_dir.gen({"foo": "foo", "bar": "bar"})
foo1 = run_copy("foo", "foo1", name="foo1")
foo2 = run_copy("foo1", "foo2", name="foo2")
run_copy("bar", "bar1", name="bar1")
run_copy("bar1", "bar2", name="bar2")
cat = dvc.run(cmd="cat bar2 foo2", deps=["foo2", "bar2"], name="cat")

tmp_dir.gen("foo", "foobar")
assert dvc.reproduce(force_downstream=True) == [foo1, foo2, cat]


def test_repro_pipeline(
tmp_dir,
dvc,
Expand Down Expand Up @@ -1296,3 +1308,53 @@ def test_repro_single_item_with_multiple_targets(tmp_dir, dvc, copy_script):
stage2,
stage1,
]


def test_repro_keep_going(mocker, tmp_dir, dvc, copy_script):
from dvc.repo import reproduce

(bar_stage, foo_stage) = tmp_dir.dvc_gen({"bar": "bar", "foo": "foo"})
stage1 = dvc.stage.add(
cmd=["python copy.py bar foobar", "exit 1"],
deps=["bar"],
outs=["foobar"],
name="copy-bar-foobar",
)
dvc.stage.add(cmd="cat foobar foo", deps=["foobar", "foo"], name="cat")
spy = mocker.spy(reproduce, "_reproduce_stage")

with pytest.raises(ReproductionError):
dvc.reproduce(on_error="keep-going", repro_fn=spy)

assert spy.call_args_list == [
mocker.call(bar_stage, upstream=[], force=False, interactive=False),
mocker.call(stage1, upstream=[bar_stage], force=False, interactive=False),
mocker.call(foo_stage, upstream=[], force=False, interactive=False),
]


def test_repro_ignore_errors(M, mocker, tmp_dir, dvc, copy_script):
from dvc.repo import reproduce

(bar_stage, foo_stage) = tmp_dir.dvc_gen({"bar": "bar", "foo": "foo"})
stage1 = dvc.stage.add(
cmd=["python copy.py bar foobar", "exit 1"],
deps=["bar"],
outs=["foobar"],
name="copy-bar-foobar",
)
stage2 = dvc.stage.add(cmd="cat foobar foo", deps=["foobar", "foo"], name="cat")
spy = mocker.spy(reproduce, "_reproduce_stage")
dvc.reproduce(on_error="ignore", repro_fn=spy)

assert spy.call_args_list == [
mocker.call(bar_stage, upstream=[], force=False, interactive=False),
mocker.call(stage1, upstream=[bar_stage], force=False, interactive=False),
mocker.call(foo_stage, upstream=[], force=False, interactive=False),
mocker.call(
stage2,
upstream=[foo_stage, stage1],
force=False,
interactive=False,
),
]
1 change: 1 addition & 0 deletions tests/unit/command/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"pull": False,
"allow_missing": False,
"targets": [],
"on_error": "fail",
}
repro_arguments = {
"run_cache": True,
Expand Down
Loading

0 comments on commit eb212a7

Please sign in to comment.