Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvcfs.get: simplify and separate files into respective filesystems while walking #10544

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ def download(self, to: "Output", jobs: Optional[int] = None):
return

hashes: list[tuple[str, HashInfo, dict[str, Any]]] = []
for src_path, dest_path, *rest in files:
for src_path, dest_path, maybe_info in files:
try:
info = rest[0] if rest else self.fs.info(src_path)
info = maybe_info or self.fs.info(src_path)
hash_info = info["dvc_info"]["entry"].hash_info
dest_info = to.fs.info(dest_path)
except (KeyError, AttributeError):
Expand Down
7 changes: 4 additions & 3 deletions dvc/fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
from typing import Optional, Union
from itertools import repeat
from typing import Optional
from urllib.parse import urlparse

from dvc.config import ConfigError as RepoConfigError
Expand Down Expand Up @@ -47,7 +48,7 @@

def download(
fs: "FileSystem", fs_path: str, to: str, jobs: Optional[int] = None
) -> list[Union[tuple[str, str], tuple[str, str, dict]]]:
) -> list[tuple[str, str, Optional[dict]]]:
from dvc.scm import lfs_prefetch

from .callbacks import TqdmCallback
Expand Down Expand Up @@ -84,7 +85,7 @@ def download(
cb.set_size(len(from_infos))
jobs = jobs or fs.jobs
generic.copy(fs, from_infos, localfs, to_infos, callback=cb, batch_size=jobs)
return list(zip(from_infos, to_infos))
return list(zip(from_infos, to_infos, repeat(None)))


def parse_external_url(url, fs_config=None, config=None):
Expand Down
63 changes: 39 additions & 24 deletions dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import os
import posixpath
import threading
from collections import deque
from contextlib import ExitStack, suppress
from collections import defaultdict, deque
from contextlib import ExitStack, nullcontext, suppress
from glob import has_magic
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

Expand All @@ -19,6 +19,8 @@
from .data import DataFileSystem

if TYPE_CHECKING:
from contextlib import AbstractContextManager

from dvc.repo import Repo
from dvc.types import DictStrAny, StrPath

Expand Down Expand Up @@ -498,7 +500,7 @@ def get(
**kwargs,
)

def _get( # noqa: C901
def _get( # noqa: C901, PLR0912
self,
rpath,
lpath,
Expand All @@ -507,7 +509,7 @@ def _get( # noqa: C901
maxdepth=None,
batch_size=None,
**kwargs,
) -> list[Union[tuple[str, str], tuple[str, str, dict]]]:
) -> list[tuple[str, str, Optional[dict]]]:
if (
isinstance(rpath, list)
or isinstance(lpath, list)
Expand All @@ -531,10 +533,13 @@ def _get( # noqa: C901
if self.isfile(rpath):
with callback.branched(rpath, lpath) as child:
self.get_file(rpath, lpath, callback=child, **kwargs)
return [(rpath, lpath)]
return [(rpath, lpath, None)]

_files = []
result: list[tuple[str, str, Optional[dict]]] = []
_dirs: list[str] = []
_files: dict[FileSystem, list[tuple[str, str, Optional[dict]]]]
_files = defaultdict(list)

for root, dirs, files in self.walk(rpath, maxdepth=maxdepth, detail=True):
if files:
callback.set_size((callback.size or 0) + len(files))
Expand All @@ -550,32 +555,42 @@ def _get( # noqa: C901
_, dvc_fs, _ = self._get_subrepo_info(key)

for name, info in files.items():
dvc_info = info.get("dvc_info")
fs_info = info.get("fs_info")
if dvc_fs and dvc_info and not fs_info:
fs = dvc_fs
fs_path = dvc_info["name"]
else:
fs = self.repo.fs
fs_path = fs_info["name"]

src_path = f"{root}{self.sep}{name}"
dest_path = f"{dest_root}{os.path.sep}{name}"
_files.append((dvc_fs, src_path, dest_path, info))
_files[fs].append((fs_path, dest_path, dvc_info))
result.append((src_path, dest_path, info))

os.makedirs(lpath, exist_ok=True)
for d in _dirs:
os.mkdir(d)

repo_fs = self.repo.fs
def get_file(arg: tuple[FileSystem, tuple[str, str, Optional[dict]]]):
fs, (src, dest, info) = arg
kw = kwargs
if isinstance(fs, DataFileSystem):
kw = kw | {"info": info}
return fs.get_file(src, dest, callback=callback, **kw)

def _get_file(arg):
dvc_fs, src, dest, info = arg
dvc_info = info.get("dvc_info")
fs_info = info.get("fs_info")
if dvc_fs and dvc_info and not fs_info:
dvc_path = dvc_info["name"]
dvc_fs.get_file(
dvc_path, dest, callback=callback, info=dvc_info, **kwargs
)
else:
fs_path = fs_info["name"]
repo_fs.get_file(fs_path, dest, callback=callback, **kwargs)
return src, dest, info
if batch_size == 1:
ctx: AbstractContextManager = nullcontext()
map_fn: Callable = map
else:
ctx = ThreadPoolExecutor(max_workers=batch_size)
map_fn = ctx.imap_unordered

with ThreadPoolExecutor(max_workers=batch_size) as executor:
return list(executor.imap_unordered(_get_file, _files))
with ctx:
it = ((fs, f) for fs, files in _files.items() for f in files)
deque(map_fn(get_file, it), maxlen=0)
return result

def get_file(self, rpath, lpath, **kwargs):
dvc_info = kwargs.pop("info", {}).pop("dvc_info", None)
Expand Down Expand Up @@ -670,7 +685,7 @@ def _get(
recursive: bool = False,
batch_size: Optional[int] = None,
**kwargs,
) -> list[Union[tuple[str, str], tuple[str, str, dict]]]:
) -> list[tuple[str, str, Optional[dict]]]:
# FileSystem.get is non-recursive by default if arguments are lists
# otherwise, it's recursive.
recursive = not (isinstance(from_info, list) and isinstance(to_info, list))
Expand Down
Loading