Skip to content

Commit

Permalink
remove 3.8 compat for executors; update ruff and cleanup (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
skshetry authored Feb 28, 2024
1 parent 2039bd7 commit 2448fd1
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 64 deletions.
7 changes: 1 addition & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
- id: sort-simple-yaml
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.1.7'
rev: 'v0.2.2'
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand All @@ -30,8 +30,3 @@ repos:
hooks:
- id: codespell
additional_dependencies: ["tomli"]
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.0
hooks:
- id: pyupgrade
args: [--py38-plus]
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ module = [
ignore-words-list = "cachable,"

[tool.ruff]
output-format = "full"
show-fixes = true

[tool.ruff.lint]
ignore = [
"ISC001", # single-line-implicit-string-concatenation
"PLR2004", # magic-value-comparison
Expand Down Expand Up @@ -155,8 +159,6 @@ select = [
"W", # pycodestyle - Warning
"YTT", # flake8-2020
]
show-source = true
show-fixes = true

[tool.ruff.lint.flake8-pytest-style]
fixture-parentheses = false
Expand Down
11 changes: 4 additions & 7 deletions src/dvc_objects/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,11 @@ def _oids_with_limit(
prefixes: Optional[Iterable[str]] = None,
jobs: Optional[int] = None,
) -> Iterator[str]:
count = 0
for oid in self._list_oids(prefixes=prefixes, jobs=jobs):
for i, oid in enumerate(self._list_oids(prefixes=prefixes, jobs=jobs), start=1):
yield oid
count += 1
if count > limit:
if i > limit:
logger.debug(
"`_list_oids()` returned max %r oids, "
"skipping remaining results",
"`_list_oids()` returned max %r oids, skipping remaining results",
limit,
)
return
Expand Down Expand Up @@ -348,7 +345,7 @@ def _list_oids_traverse(self, remote_size, remote_oids, jobs=None):

yield from self._list_oids(prefixes=traverse_prefixes, jobs=jobs)

def all(self, jobs=None): # noqa: A003
def all(self, jobs=None):
"""Iterate over all oids in this fs.
Hashes will be fetched in parallel threads according to prefix
Expand Down
50 changes: 5 additions & 45 deletions src/dvc_objects/executors.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,47 @@
import asyncio
import queue
import sys
from collections.abc import Coroutine, Iterable, Iterator, Sequence
from concurrent import futures
from itertools import islice
from typing import (
Any,
Callable,
Optional,
TypeVar,
)
from typing import Any, Callable, Optional, TypeVar

from fsspec import Callback

_T = TypeVar("_T")


class ThreadPoolExecutor(futures.ThreadPoolExecutor):
_max_workers: int

def __init__(
self, max_workers: Optional[int] = None, cancel_on_error: bool = False, **kwargs
):
super().__init__(max_workers=max_workers, **kwargs)
self._cancel_on_error = cancel_on_error

@property
def max_workers(self) -> int:
return self._max_workers

def imap_unordered(
self, fn: Callable[..., _T], *iterables: Iterable[Any]
) -> Iterator[_T]:
"""Lazier version of map that does not preserve ordering of results.
It does not create all the futures at once to reduce memory usage.
"""

it = zip(*iterables)
if self.max_workers == 1:
if self._max_workers == 1:
for args in it:
yield fn(*args)
return

def create_taskset(n: int) -> set[futures.Future]:
return {self.submit(fn, *args) for args in islice(it, n)}

tasks = create_taskset(self.max_workers * 5)
tasks = create_taskset(self._max_workers * 5)
while tasks:
done, tasks = futures.wait(tasks, return_when=futures.FIRST_COMPLETED)
for fut in done:
yield fut.result()
tasks.update(create_taskset(len(done)))

def shutdown(self, wait=True, *, cancel_futures=False):
if sys.version_info > (3, 9):
return super().shutdown(wait=wait, cancel_futures=cancel_futures)
else: # noqa: RET505
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()

# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None) # type: ignore[arg-type]
if wait:
for t in self._threads:
t.join()

def __exit__(self, exc_type, exc_val, exc_tb):
if self._cancel_on_error:
self.shutdown(wait=True, cancel_futures=exc_val is not None)
else:
self.shutdown(wait=True)
cancel_futures = self._cancel_on_error and exc_val is not None
self.shutdown(wait=True, cancel_futures=cancel_futures)
return False


Expand Down
6 changes: 3 additions & 3 deletions src/dvc_objects/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def is_empty(self, path: AnyFSPath) -> bool:
return entry["size"] == 0

@overload
def open( # noqa: A003
def open(
self,
path: AnyFSPath,
mode: Literal["rb", "br", "wb"],
Expand All @@ -305,15 +305,15 @@ def open( # noqa: A003
return self.open(path, mode, **kwargs)

@overload
def open( # noqa: A003
def open(
self,
path: AnyFSPath,
mode: Literal["r", "rt", "w"] = "r",
**kwargs: Any,
) -> "TextIO":
...

def open( # noqa: A003
def open(
self,
path: AnyFSPath,
mode: str = "r",
Expand Down
2 changes: 1 addition & 1 deletion src/dvc_objects/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def copy(self, path1, path2, recursive=False, on_error=None, **kwargs):
self.rm_file(tmp_info)
raise

def open(self, path, mode="r", encoding=None, **kwargs): # noqa: A003
def open(self, path, mode="r", encoding=None, **kwargs):
return open(path, mode=mode, encoding=encoding) # noqa: SIM115

def symlink(self, path1, path2):
Expand Down

0 comments on commit 2448fd1

Please sign in to comment.