From 40cb3a41dbc8c1dbcc4210d77597ba447c2e84a5 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Tue, 16 Jul 2024 16:48:27 +0200 Subject: [PATCH] Added `DBFSPath` as `os.PathLike` implementation (#131) This PR extends the existing `WorkspacePath` support with a pathlib-like implementation for DBFS paths: `DBFSPath` Incidental changes include: - Type-hinting the implementation and interfaces to assist with code linting. --- src/databricks/labs/blueprint/paths.py | 562 +++++++++++++++++-------- tests/integration/test_paths.py | 113 ++++- tests/unit/test_paths.py | 22 +- 3 files changed, 501 insertions(+), 196 deletions(-) diff --git a/src/databricks/labs/blueprint/paths.py b/src/databricks/labs/blueprint/paths.py index f31cec8..3db4d42 100644 --- a/src/databricks/labs/blueprint/paths.py +++ b/src/databricks/labs/blueprint/paths.py @@ -1,21 +1,25 @@ from __future__ import annotations import abc +import builtins import fnmatch +import io import locale import logging import os import posixpath import re +import shutil from abc import abstractmethod -from collections.abc import Iterable, Sequence +from collections.abc import Generator, Iterable, Sequence from io import BytesIO, StringIO from pathlib import Path, PurePath from typing import NoReturn, TypeVar from urllib.parse import quote_from_bytes as urlquote_from_bytes from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import DatabricksError, NotFound +from databricks.sdk.errors import DatabricksError, NotFound, ResourceDoesNotExist +from databricks.sdk.service.files import FileInfo from databricks.sdk.service.workspace import ( ExportFormat, ImportFormat, @@ -61,8 +65,11 @@ def __init__(self, ws: WorkspaceClient, path: str): StringIO.__init__(self) -class WorkspacePath(Path): # pylint: disable=too-many-public-methods - """Experimental implementation of pathlib.Path for Databricks Workspace.""" +P = TypeVar("P", bound="_DatabricksPath") + + +class _DatabricksPath(Path, abc.ABC): # pylint: disable=too-many-public-methods + """Base-class for experimental pathlib.Path implementations covering Databricks Workspace paths and DBFS.""" # Implementation notes: # - The builtin Path classes are not designed for extension, which in turn makes everything a little cumbersome. @@ -77,12 +84,14 @@ class WorkspacePath(Path): # pylint: disable=too-many-public-methods # 1. Flavour has been replaced with posixpath and ntpath (normally imported as os.path). Still class-scoped. # 2. Accessor has been replaced with inline implementations based directly on the 'os' module. # - # This implementation for Workspace paths does the following: + # This implementation for Databricks-style paths does the following: # 1. Flavour is basically posix-style, with the caveat that we don't bother with the special //-prefix handling. - # 2. The Accessor is delegated to existing routines available via the workspace client. + # 2. The Accessor is replaced by delegation to existing routines available via the workspace client. # 3. Python 3.12 introduces some new API elements. Because these are source-compatible with earlier versions # these are forward-ported and implemented. # + # The current class hierarchy implements behaviour (and differentiation) by inheritance rather than composition. + # __slots__ = ( # pylint: disable=redefined-slots-in-subclass # For us this is always the empty string. Consistent with the superclass attribute for Python 3.10-3.13b. "_drv", @@ -98,12 +107,9 @@ class WorkspacePath(Path): # pylint: disable=too-many-public-methods "_hash", # The workspace client that we use to perform I/O operations on the path. "_ws", - # The cached _object_info value for the instance. - "_cached_object_info", ) - _cached_object_info: ObjectInfo - - _SUFFIXES = {".py": Language.PYTHON, ".sql": Language.SQL, ".scala": Language.SCALA, ".R": Language.R} + _str: str + _hash: int # Path semantics are posix-like. parser = posixpath @@ -111,6 +117,8 @@ class WorkspacePath(Path): # pylint: disable=too-many-public-methods # Compatibility attribute, for when superclass implementations get invoked on python <= 3.11. _flavour = object() + # Public APIs that we don't support. + as_uri = _na("as_uri") cwd = _na("cwd") stat = _na("stat") chmod = _na("chmod") @@ -125,11 +133,11 @@ class WorkspacePath(Path): # pylint: disable=too-many-public-methods link_to = _na("link_to") samefile = _na("samefile") - def __new__(cls, *args, **kwargs) -> WorkspacePath: + def __new__(cls, *args, **kwargs): # Force all initialisation to go via __init__() irrespective of the (Python-specific) base version. return object.__new__(cls) - def __init__(self, ws: WorkspaceClient, *args) -> None: # pylint: disable=super-init-not-called,useless-suppression + def __init__(self, ws: WorkspaceClient, *args: str | bytes | os.PathLike) -> None: # We deliberately do _not_ call the super initializer because we're taking over complete responsibility for the # implementation of the public API. @@ -138,24 +146,13 @@ def __init__(self, ws: WorkspaceClient, *args) -> None: # pylint: disable=super # Normalise the paths that we have. root, path_parts = self._parse_and_normalize(raw_paths) - self._drv = "" self._root = root self._path_parts = path_parts self._ws = ws - @classmethod - def _from_object_info(cls, ws: WorkspaceClient, object_info: ObjectInfo): - """Special (internal-only) constructor that creates an instance based on ObjectInfo.""" - if not object_info.path: - msg = f"Cannot initialise within object path: {object_info}" - raise ValueError(msg) - path = cls(ws, object_info.path) - path._cached_object_info = object_info - return path - @staticmethod - def _to_raw_paths(*args) -> list[str]: + def _to_raw_paths(*args: str | bytes | os.PathLike) -> list[str]: raw_paths: list[str] = [] for arg in args: if isinstance(arg, PurePath): @@ -208,9 +205,49 @@ def _splitroot(cls, part: str, sep: str) -> tuple[str, str]: return sep, part.lstrip(sep) return "", part + @abstractmethod + def as_fuse(self) -> Path: ... + + @abstractmethod + def exists(self, *, follow_symlinks: bool = True) -> bool: ... + + @abstractmethod + def _mkdir(self) -> None: ... + + @abstractmethod + def rmdir(self, recursive: bool = False) -> None: ... + + @abstractmethod + def unlink(self, missing_ok: bool = False) -> None: ... + + @abstractmethod + def open( + self, + mode: str = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ): ... + + @abstractmethod + def is_dir(self) -> bool: ... + + @abstractmethod + def is_file(self) -> bool: ... + + @abstractmethod + def rename(self: P, target: str | bytes | os.PathLike) -> P: ... + + @abstractmethod + def replace(self: P, target: str | bytes | os.PathLike) -> P: ... + + @abstractmethod + def iterdir(self: P) -> Generator[P, None, None]: ... + def __reduce__(self) -> NoReturn: # Cannot support pickling because we can't pickle the workspace client. - msg = "Pickling Workspace paths is not supported." + msg = f"Pickling {self.__class__.__qualname__} paths is not supported." raise NotImplementedError(msg) def __fspath__(self): @@ -224,34 +261,31 @@ def __fspath__(self): # - PEP 519 (https://peps.python.org/pep-0519/) # - os.fspath (https://docs.python.org/3/library/os.html#os.fspath) # TODO: Allow this to work when within an appropriate Databricks Runtime that mounts Workspace paths via FUSE. - msg = f"Workspace paths are not path-like: {self}" + msg = f"{self.__class__.__qualname__} paths are not path-like: {self}" raise NotImplementedError(msg) - def as_posix(self): + def as_posix(self) -> str: return str(self) - def __str__(self): + def __str__(self) -> str: try: return self._str except AttributeError: self._str = (self._root + self.parser.sep.join(self._path_parts)) or "." return self._str - def __bytes__(self): + def __bytes__(self) -> bytes: return str(self).encode("utf-8") - def __repr__(self): - return f"{self.__class__.__name__}({str(self)!r})" - - def as_uri(self) -> str: - return f"{self._ws.config.host}#workspace{urlquote_from_bytes(bytes(self))}" + def __repr__(self) -> str: + return f"{self.__class__.__qualname__}({str(self)!r})" - def __eq__(self, other): + def __eq__(self, other) -> bool: if not isinstance(other, type(self)): return NotImplemented return str(self) == str(other) - def __hash__(self): + def __hash__(self) -> int: try: return self._hash except AttributeError: @@ -284,53 +318,53 @@ def _raw_paths(self) -> NoReturn: # See __rtruediv__ for more information. raise TypeError("trigger NotImplemented") - def __lt__(self, other): + def __lt__(self, other) -> bool: if not isinstance(other, type(self)): return NotImplemented return self._path_parts < other._path_parts - def __le__(self, other): + def __le__(self, other) -> bool: if not isinstance(other, type(self)): return NotImplemented return self._path_parts <= other._path_parts - def __gt__(self, other): + def __gt__(self, other) -> bool: if not isinstance(other, type(self)): return NotImplemented return self._path_parts > other._path_parts - def __ge__(self, other): + def __ge__(self, other) -> bool: if not isinstance(other, type(self)): return NotImplemented return self._path_parts >= other._path_parts - def with_segments(self, *pathsegments): - return type(self)(self._ws, *pathsegments) + def with_segments(self: P, *path_segments: bytes | str | os.PathLike) -> P: + return type(self)(self._ws, *path_segments) @property def drive(self) -> str: return self._drv @property - def root(self): + def root(self) -> str: return self._root @property - def anchor(self): + def anchor(self) -> str: return self.drive + self.root @property - def name(self): + def name(self) -> str: path_parts = self._path_parts return path_parts[-1] if path_parts else "" @property - def parts(self): + def parts(self) -> tuple[str, ...]: if self.drive or self.root: return self.drive + self.root, *self._path_parts return self._path_parts - def with_name(self, name): + def with_name(self: P, name: str) -> P: parser = self.parser if not name or parser.sep in name or name == ".": msg = f"Invalid name: {name!r}" @@ -341,7 +375,7 @@ def with_name(self, name): path_parts[-1] = name return type(self)(self._ws, self.anchor, *path_parts) - def with_suffix(self, suffix): + def with_suffix(self: P, suffix: str) -> P: stem = self.stem if not stem: msg = f"{self!r} has an empty name" @@ -351,13 +385,13 @@ def with_suffix(self, suffix): raise ValueError(msg) return self.with_name(stem + suffix) - def relative_to(self, other, *more_other, walk_up=False): # pylint: disable=arguments-differ - other = self.with_segments(other, *more_other) - if self.anchor != other.anchor: - msg = f"{str(self)!r} and {str(other)!r} have different anchors" + def relative_to(self: P, *other: str | bytes | os.PathLike, walk_up: bool = False) -> P: + normalized = self.with_segments(*other) + if self.anchor != normalized.anchor: + msg = f"{str(self)!r} and {str(normalized)!r} have different anchors" raise ValueError(msg) path_parts0 = self._path_parts - path_parts1 = other._path_parts # pylint: disable=protected-access + path_parts1 = normalized._path_parts # pylint: disable=protected-access # Find the length of the common prefix. i = 0 while i < len(path_parts0) and i < len(path_parts1) and path_parts0[i] == path_parts1[i]: @@ -366,29 +400,29 @@ def relative_to(self, other, *more_other, walk_up=False): # pylint: disable=arg # Handle walking up. if i < len(path_parts1): if not walk_up: - msg = f"{str(self)!r} is not in the subpath of {str(other)!r}" + msg = f"{str(self)!r} is not in the subpath of {str(normalized)!r}" raise ValueError(msg) if ".." in path_parts1[i:]: - raise ValueError(f"'..' segment in {str(other)!r} cannot be walked") + raise ValueError(f"'..' segment in {str(normalized)!r} cannot be walked") walkup_parts = [".."] * (len(path_parts1) - i) relative_parts = (*walkup_parts, *relative_parts) return self.with_segments("", *relative_parts) - def is_relative_to(self, other, *more_other): # pylint: disable=arguments-differ - other = self.with_segments(other, *more_other) - if self.anchor != other.anchor: + def is_relative_to(self, *other: str | bytes | os.PathLike) -> bool: + normalized = self.with_segments(*other) + if self.anchor != normalized.anchor: return False path_parts0 = self._path_parts - path_parts1 = other._path_parts # pylint: disable=protected-access + path_parts1 = normalized._path_parts # pylint: disable=protected-access return path_parts0[: len(path_parts1)] == path_parts1 @property - def parent(self): + def parent(self: P) -> P: rel_path = self._path_parts return self.with_segments(self.anchor, *rel_path[:-1]) if rel_path else self @property - def parents(self): + def parents(self: P) -> tuple[P, ...]: parents = [] path = self parent = path.parent @@ -398,16 +432,16 @@ def parents(self): parent = path.parent return tuple(parents) - def is_absolute(self): + def is_absolute(self) -> bool: return bool(self.anchor) - def is_reserved(self): + def is_reserved(self) -> bool: return False - def joinpath(self, *pathsegments): - return self.with_segments(self, *pathsegments) + def joinpath(self: P, *path_segments) -> P: + return self.with_segments(self, *path_segments) - def __truediv__(self, other): + def __truediv__(self: P, other: str | bytes | os.PathLike) -> P: try: return self.with_segments(*self._parts(), other) except TypeError: @@ -415,7 +449,7 @@ def __truediv__(self, other): def __rtruediv__(self, other): # Note: this is only invoked if __truediv__ has already returned NotImplemented. - # For the case of Path / WorkspacePath this means the underlying __truediv__ is invoked. + # For the case of Path / _DatabricksPath this means the underlying __truediv__ is invoked. # The base-class implementations all access internals but yield NotImplemented if TypeError is raised. As # such we stub those internals (_from_parts and _raw_path) to trigger the NotImplemented path and ensure that # control ends up here. @@ -426,7 +460,7 @@ def __rtruediv__(self, other): except TypeError: return NotImplemented - def match(self, path_pattern, *, case_sensitive=None): + def match(self, path_pattern: str | bytes | os.PathLike, *, case_sensitive: bool | None = None) -> bool: # Convert the pattern to a fake path (with globs) to help with matching parts. if not isinstance(path_pattern, PurePath): path_pattern = self.with_segments(path_pattern) @@ -448,63 +482,313 @@ def match(self, path_pattern, *, case_sensitive=None): return False return True - def as_fuse(self): + def home(self): # pylint: disable=arguments-differ + """Return the user's home directory. Adapted from pathlib.Path""" + return type(self)(self._ws, "~").expanduser() + + def expanduser(self: P) -> P: + # Expand ~ (but NOT ~user) constructs. + if not (self._drv or self._root) and self._path_parts and self._path_parts[0][:1] == "~": + if self._path_parts[0] == "~": + user_name = self._ws.current_user.me().user_name + else: + other_user = self._path_parts[0][1:] + msg = f"Cannot determine home directory for: {other_user}" + raise RuntimeError(msg) + if user_name is None: + raise RuntimeError("Could not determine home directory.") + homedir = f"/Users/{user_name}" + return self.with_segments(homedir, *self._path_parts[1:]) + return self + + def _return_false(self) -> bool: + return False + + is_symlink = _return_false + is_block_device = _return_false + is_char_device = _return_false + is_fifo = _return_false + is_socket = _return_false + is_mount = _return_false + is_junction = _return_false + + def resolve(self: P, strict: bool = False) -> P: + """Return the absolute path of the file or directory in Databricks Workspace.""" + absolute = self.absolute() + if strict and not absolute.exists(): + msg = f"Path does not exist: {self}" + raise FileNotFoundError(msg) + return absolute + + def absolute(self: P) -> P: + if self.is_absolute(): + return self + return self.with_segments(self.cwd(), self) + + def mkdir(self, mode: int = 0o600, parents: bool = True, exist_ok: bool = True) -> None: + """Create a directory;Only mode 0o600 is supported.""" + if not exist_ok: + raise ValueError("exist_ok must be True for Databricks Workspace") + if not parents: + raise ValueError("parents must be True for Databricks Workspace") + if mode != 0o600: + raise ValueError("other modes than 0o600 are not yet supported") + self._mkdir() + + def _prepare_pattern(self, pattern: str | bytes | os.PathLike) -> Sequence[str]: + if not pattern: + raise ValueError("Glob pattern must not be empty.") + parsed_pattern = self.with_segments(pattern) + if parsed_pattern.anchor: + msg = f"Non-relative patterns are unsupported: {pattern!s}" + raise NotImplementedError(msg) + pattern_parts = parsed_pattern._path_parts # pylint: disable=protected-access + if ".." in pattern_parts: + msg = f"Parent traversal is not supported: {pattern!s}" + raise ValueError(msg) + if os.fspath(pattern)[-1] == self.parser.sep: + pattern_parts = (*pattern_parts, "") + return pattern_parts + + def glob( + self: P, + pattern: str | bytes | os.PathLike, + *, + case_sensitive: bool | None = None, + ) -> Generator[P, None, None]: + pattern_parts = self._prepare_pattern(pattern) + if case_sensitive is None: + case_sensitive = True + selector = _Selector.parse(pattern_parts, case_sensitive=case_sensitive) + yield from selector(self) + + def rglob( + self: P, + pattern: str | bytes | os.PathLike, + *, + case_sensitive: bool | None = None, + ) -> Generator[P, None, None]: + pattern_parts = ("**", *self._prepare_pattern(pattern)) + if case_sensitive is None: + case_sensitive = True + selector = _Selector.parse(pattern_parts, case_sensitive=case_sensitive) + yield from selector(self) + + +class DBFSPath(_DatabricksPath): + """Experimental implementation of pathlib.Path for DBFS paths.""" + + __slots__ = ( + # The cached _file_info value for the instance. + "_cached_file_info", + ) + _cached_file_info: FileInfo + + @classmethod + def _from_file_info(cls, ws: WorkspaceClient, file_info: FileInfo) -> DBFSPath: + """Special (internal-only) constructor that creates an instance based on FileInfo.""" + if not file_info.path: + msg = f"Cannot initialise without file path: {file_info}" + raise ValueError(msg) + path = cls(ws, file_info.path) + path._cached_file_info = file_info + return path + + def as_fuse(self) -> Path: """Return FUSE-mounted path in Databricks Runtime.""" if "DATABRICKS_RUNTIME_VERSION" not in os.environ: logger.warning("This method is only available in Databricks Runtime") - return Path("/Workspace", self.as_posix().lstrip("/")) + return Path("/dbfs", self.as_posix().lstrip("/")) - def home(self): # pylint: disable=arguments-differ - """Return the user's home directory. Adapted from pathlib.Path""" - return WorkspacePath(self._ws, "~").expanduser() + def exists(self, *, follow_symlinks: bool = True) -> bool: + """Return True if the path points to an existing file, directory, or notebook""" + if not follow_symlinks: + raise NotImplementedError("follow_symlinks=False is not supported for DBFS") + try: + self._cached_file_info = self._ws.dbfs.get_status(self.as_posix()) + return True + except NotFound: + return False + + def _mkdir(self) -> None: + self._ws.dbfs.mkdirs(self.as_posix()) + + def rmdir(self, recursive: bool = False) -> None: + """Remove a DBFS directory""" + self._ws.dbfs.delete(self.as_posix(), recursive=recursive) + + def rename(self: P, target: str | bytes | os.PathLike) -> P: + """Rename this path as the target, unless the target already exists.""" + dst = self.with_segments(target) + self._ws.dbfs.move(self.as_posix(), dst.as_posix()) + return dst + + def replace(self: P, target: str | bytes | os.PathLike) -> P: + """Rename this path, overwriting the target if it exists and can be overwritten.""" + dst = self.with_segments(target) + if self.is_dir(): + msg = f"DBFS directories cannot currently be replaced: {self} -> {dst}" + raise ValueError(msg) + # Can't use self._ws.dbfs.move_(): it doesn't honour the overwrite flag properly. + with dst.open(mode="wb") as writer, self.open(mode="rb") as reader: + shutil.copyfileobj(reader, writer, length=1024 * 1024) + self.unlink() + return dst + + def unlink(self, missing_ok: bool = False) -> None: + """Remove a file in Databricks Workspace.""" + # Although this introduces a race-condition, we have to handle missing_ok in advance because the DBFS client + # doesn't report any error if deleting a target that doesn't exist. + if not missing_ok and not self.exists(): + raise FileNotFoundError(f"{self.as_posix()} does not exist") + self._ws.dbfs.delete(self.as_posix()) + + def open( + self, + mode: str = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ): + """Open a DBFS file. + + Only text and binary I/O are supported in basic read or write mode, along with 'x' to avoid overwriting.""" + is_write = "w" in mode + is_read = "r" in mode or not is_write + if is_read and is_write: + msg = f"Unsupported mode: {mode} (simultaneous read and write)" + raise ValueError(msg) + is_binary = "b" in mode + is_text = "t" in mode or not is_binary + if is_binary and is_text: + msg = f"Unsupported mode: {mode} (binary and text I/O)" + raise ValueError(msg) + is_overwrite = is_write and "x" not in mode + binary_io = self._ws.dbfs.open(self.as_posix(), read=is_read, write=is_write, overwrite=is_overwrite) + if is_text: + return io.TextIOWrapper(binary_io, encoding=encoding, errors=errors, newline=newline) + return binary_io + + def write_bytes(self, data): + """Write the (binary) data to this path.""" + # The DBFS BinaryIO implementation only accepts bytes and rejects the (other) byte-like builtins. + match data: + case builtins.bytes | builtins.bytearray: + binary_data = bytes(data) + case _: + binary_data = bytes(memoryview(data)) + with self.open("wb") as f: + return f.write(binary_data) + + @property + def _file_info(self) -> FileInfo: + # this method is cached because it is used in multiple is_* methods. + # DO NOT use this method in methods, where fresh result is required. + try: + return self._cached_file_info + except AttributeError: + self._cached_file_info = self._ws.dbfs.get_status(self.as_posix()) + return self._cached_file_info + + def is_dir(self) -> bool: + """Return True if the path points to a DBFS directory.""" + try: + return bool(self._file_info.is_dir) + except DatabricksError: + return False + + def is_file(self) -> bool: + """Return True if the path points to a file in Databricks Workspace.""" + return not self.is_dir() + + def iterdir(self) -> Generator[DBFSPath, None, None]: + for child in self._ws.dbfs.list(self.as_posix()): + yield self._from_file_info(self._ws, child) + + +class WorkspacePath(_DatabricksPath): + """Experimental implementation of pathlib.Path for Databricks Workspace.""" - def exists(self, *, follow_symlinks=True): + __slots__ = ( + # The cached _object_info value for the instance. + "_cached_object_info", + ) + _cached_object_info: ObjectInfo + + _SUFFIXES = {".py": Language.PYTHON, ".sql": Language.SQL, ".scala": Language.SCALA, ".R": Language.R} + + @classmethod + def _from_object_info(cls, ws: WorkspaceClient, object_info: ObjectInfo) -> WorkspacePath: + """Special (internal-only) constructor that creates an instance based on ObjectInfo.""" + if not object_info.path: + msg = f"Cannot initialise without object path: {object_info}" + raise ValueError(msg) + path = cls(ws, object_info.path) + path._cached_object_info = object_info + return path + + def as_uri(self) -> str: + return f"{self._ws.config.host}#workspace{urlquote_from_bytes(bytes(self))}" + + def as_fuse(self) -> Path: + """Return FUSE-mounted path in Databricks Runtime.""" + if "DATABRICKS_RUNTIME_VERSION" not in os.environ: + logger.warning("This method is only available in Databricks Runtime") + return Path("/Workspace", self.as_posix().lstrip("/")) + + def exists(self, *, follow_symlinks: bool = True) -> bool: """Return True if the path points to an existing file, directory, or notebook""" if not follow_symlinks: raise NotImplementedError("follow_symlinks=False is not supported for Databricks Workspace") try: - self._ws.workspace.get_status(self.as_posix()) + self._cached_object_info = self._ws.workspace.get_status(self.as_posix()) return True except NotFound: return False - def mkdir(self, mode=0o600, parents=True, exist_ok=True): - """Create a directory in Databricks Workspace. Only mode 0o600 is supported.""" - if not exist_ok: - raise ValueError("exist_ok must be True for Databricks Workspace") - if not parents: - raise ValueError("parents must be True for Databricks Workspace") - if mode != 0o600: - raise ValueError("other modes than 0o600 are not yet supported") + def _mkdir(self) -> None: self._ws.workspace.mkdirs(self.as_posix()) - def rmdir(self, recursive=False): + def rmdir(self, recursive: bool = False) -> None: """Remove a directory in Databricks Workspace""" self._ws.workspace.delete(self.as_posix(), recursive=recursive) - def _rename(self, target, overwrite: bool): - """Rename a file or directory in Databricks Workspace""" + def _rename(self: P, target: str | bytes | os.PathLike, overwrite: bool) -> P: + """Rename a file in Databricks Workspace""" dst = self.with_segments(target) + if self.is_dir(): + msg = f"Workspace directories cannot currently be renamed: {self} -> {dst}" + raise ValueError(msg) with self._ws.workspace.download(self.as_posix(), format=ExportFormat.AUTO) as f: self._ws.workspace.upload(dst.as_posix(), f.read(), format=ImportFormat.AUTO, overwrite=overwrite) self.unlink() return dst - def rename(self, target): - """Rename a file or directory in Databricks Workspace, failing if the target already exists.""" + def rename(self, target: str | bytes | os.PathLike): + """Rename this path as the target, unless the target already exists.""" return self._rename(target, overwrite=False) - def replace(self, target): - """Rename a file or directory in Databricks Workspace, overwriting the target if it exists.""" + def replace(self, target: str | bytes | os.PathLike): + """Rename this path, overwriting the target if it exists and can be overwritten.""" return self._rename(target, overwrite=True) - def unlink(self, missing_ok=False): + def unlink(self, missing_ok: bool = False) -> None: """Remove a file in Databricks Workspace.""" - if not missing_ok and not self.exists(): - raise FileNotFoundError(f"{self.as_posix()} does not exist") - self._ws.workspace.delete(self.as_posix()) - - def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None): + try: + self._ws.workspace.delete(self.as_posix()) + except ResourceDoesNotExist as e: + if not missing_ok: + raise FileNotFoundError(f"{self.as_posix()} does not exist") from e + + def open( + self, + mode: str = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ): """Open a file in Databricks Workspace. Only text and binary modes are supported.""" if encoding is None or encoding == "locale": encoding = locale.getpreferredencoding(False) @@ -520,7 +804,7 @@ def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None) raise ValueError(f"invalid mode: {mode}") @property - def suffix(self): + def suffix(self) -> str: """Return the file extension. If the file is a notebook, return the suffix based on the language.""" suffix = super().suffix if suffix: @@ -545,99 +829,31 @@ def _object_info(self) -> ObjectInfo: self._cached_object_info = self._ws.workspace.get_status(self.as_posix()) return self._object_info - def _return_false(self) -> bool: - return False - - is_symlink = _return_false - is_block_device = _return_false - is_char_device = _return_false - is_fifo = _return_false - is_socket = _return_false - is_mount = _return_false - is_junction = _return_false - - def resolve(self, strict=False): - """Return the absolute path of the file or directory in Databricks Workspace.""" - absolute = self.absolute() - if strict and not absolute.exists(): - msg = f"Path does not exist: {self}" - raise FileNotFoundError(msg) - return absolute - - def absolute(self): - if self.is_absolute(): - return self - return self.with_segments(self.cwd(), self) - - def is_dir(self): + def is_dir(self) -> bool: """Return True if the path points to a directory in Databricks Workspace.""" try: return self._object_info.object_type == ObjectType.DIRECTORY except DatabricksError: return False - def is_file(self): + def is_file(self) -> bool: """Return True if the path points to a file in Databricks Workspace.""" try: return self._object_info.object_type == ObjectType.FILE except DatabricksError: return False - def expanduser(self): - # Expand ~ (but NOT ~user) constructs. - if not (self._drv or self._root) and self._path_parts and self._path_parts[0][:1] == "~": - if self._path_parts[0] == "~": - user_name = self._ws.current_user.me().user_name - else: - other_user = self._path_parts[0][1:] - msg = f"Cannot determine home directory for: {other_user}" - raise RuntimeError(msg) - if user_name is None: - raise RuntimeError("Could not determine home directory.") - homedir = f"/Users/{user_name}" - return self.with_segments(homedir, *self._path_parts[1:]) - return self - - def is_notebook(self): + def is_notebook(self) -> bool: """Return True if the path points to a notebook in Databricks Workspace.""" try: return self._object_info.object_type == ObjectType.NOTEBOOK except DatabricksError: return False - def iterdir(self): + def iterdir(self) -> Generator[WorkspacePath, None, None]: for child in self._ws.workspace.list(self.as_posix()): yield self._from_object_info(self._ws, child) - def _prepare_pattern(self, pattern) -> Sequence[str]: - if not pattern: - raise ValueError("Glob pattern must not be empty.") - parsed_pattern = self.with_segments(pattern) - if parsed_pattern.anchor: - msg = f"Non-relative patterns are unsupported: {pattern}" - raise NotImplementedError(msg) - pattern_parts = parsed_pattern._path_parts # pylint: disable=protected-access - if ".." in pattern_parts: - msg = f"Parent traversal is not supported: {pattern}" - raise ValueError(msg) - if pattern[-1] == self.parser.sep: - pattern_parts = (*pattern_parts, "") - return pattern_parts - - def glob(self, pattern, *, case_sensitive=None): - pattern_parts = self._prepare_pattern(pattern) - if case_sensitive is None: - case_sensitive = True - selector = _Selector.parse(pattern_parts, case_sensitive=case_sensitive) - yield from selector(self) - - def rglob(self, pattern, *, case_sensitive=None): - pattern_parts = ("**", *self._prepare_pattern(pattern)) - if case_sensitive is None: - case_sensitive = True - selector = _Selector.parse(pattern_parts, case_sensitive=case_sensitive) - yield from selector(self) - T = TypeVar("T", bound="Path") @@ -674,8 +890,7 @@ def parse(cls, pattern_parts: Sequence[str], *, case_sensitive: bool) -> _Select raise ValueError(f"Glob pattern unsupported: {pattern_parts}") @abstractmethod - def __call__(self, path: T) -> Iterable[T]: - raise NotImplementedError() + def __call__(self, path: T) -> Iterable[T]: ... class _TerminalSelector(_Selector): @@ -703,8 +918,7 @@ def __call__(self, path: T) -> Iterable[T]: yield from self._select_children(path) @abstractmethod - def _select_children(self, path: T) -> Iterable[T]: - raise NotImplementedError() + def _select_children(self, path: T) -> Iterable[T]: ... class _LiteralSelector(_NonTerminalSelector): diff --git a/tests/integration/test_paths.py b/tests/integration/test_paths.py index f741be0..0b6b2c3 100644 --- a/tests/integration/test_paths.py +++ b/tests/integration/test_paths.py @@ -1,12 +1,12 @@ from pathlib import Path import pytest -from databricks.sdk.errors import BadRequest +from databricks.sdk.errors import BadRequest, ResourceAlreadyExists -from databricks.labs.blueprint.paths import WorkspacePath +from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath -# Currently: WorkspacePath, later: DBFSPath and VolumePath -DATABRICKS_PATHLIKE = [WorkspacePath] +# Currently: DBFSPath, WorkspacePath, later: VolumePath +DATABRICKS_PATHLIKE = [DBFSPath, WorkspacePath] @pytest.mark.parametrize("cls", DATABRICKS_PATHLIKE) @@ -38,7 +38,7 @@ def test_mkdirs(ws, make_random, cls): wsp_check = cls(ws, f"/Users/{user_name}/{name}/foo/bar/baz") assert wsp_check.is_dir() - with pytest.raises(BadRequest): + with pytest.raises(expected_exception=(BadRequest, OSError)): wsp_check.parent.rmdir() wsp_check.parent.rmdir(recursive=True) @@ -66,6 +66,28 @@ def test_open_text_io(ws, make_random, cls): assert not hello_txt.exists() +@pytest.mark.parametrize("cls", DATABRICKS_PATHLIKE) +def test_unlink(ws, make_random, cls): + name = make_random() + tmp_dir = cls(ws, f"~/{name}").expanduser() + tmp_dir.mkdir() + try: + # Check unlink() interactions with a file that exists. + some_file = tmp_dir / "some-file.txt" + some_file.write_text("Some text") + assert some_file.exists() and some_file.is_file() + some_file.unlink() + assert not some_file.exists() + + # And now the interactions with a missing file. + missing_file = tmp_dir / "missing-file.txt" + missing_file.unlink(missing_ok=True) + with pytest.raises(FileNotFoundError): + missing_file.unlink(missing_ok=False) + finally: + tmp_dir.rmdir(recursive=True) + + @pytest.mark.parametrize("cls", DATABRICKS_PATHLIKE) def test_open_binary_io(ws, make_random, cls): name = make_random() @@ -84,28 +106,85 @@ def test_open_binary_io(ws, make_random, cls): @pytest.mark.parametrize("cls", DATABRICKS_PATHLIKE) -def test_replace(ws, make_random, cls): +def test_rename_file(ws, make_random, cls): name = make_random() - wsp = cls(ws, f"~/{name}") - with_user = wsp.expanduser() - with_user.mkdir(parents=True) - - hello_txt = with_user / "hello.txt" - hello_txt.write_text("Hello, World!") + tmp_dir = cls(ws, f"~/{name}").expanduser() + tmp_dir.mkdir() + try: + # Test renaming a file when the target doesn't exist. + src_file = tmp_dir / "src.txt" + src_file.write_text("Some content") + dst_file = src_file.rename(src_file.with_name("dst.txt")) + expected_file = tmp_dir / "dst.txt" + assert dst_file == expected_file and expected_file.is_file() + + # Test renaming a file when the target already exists. + exists_file = tmp_dir / "already-exists.txt" + exists_file.write_text("Existing file.") + with pytest.raises(ResourceAlreadyExists): + _ = dst_file.rename(exists_file) + assert expected_file.exists() and expected_file.is_file() # Check it's still there. + finally: + tmp_dir.rmdir(recursive=True) + + +def test_rename_directory(ws, make_random): + # The Workspace client doesn't currently support renaming directories so we only test DBFS. + name = make_random() + tmp_dir = DBFSPath(ws, f"~/{name}").expanduser() + tmp_dir.mkdir() + try: + # Test renaming a directory (with content) when the target doesn't exist. + src_dir = tmp_dir / "src-dir" + src_dir.mkdir() + (src_dir / "content.txt").write_text("Source content.") + dst_dir = src_dir.rename(src_dir.with_name("dst-dir")) + expected_dir = tmp_dir / "dst-dir" + assert dst_dir == expected_dir and expected_dir.is_dir() and (expected_dir / "content.txt").is_file() + + # Test renaming a directory (with content) when the target already exists. + exists_dir = tmp_dir / "existing-dir" + exists_dir.mkdir() + with pytest.raises(ResourceAlreadyExists): + _ = dst_dir.rename(exists_dir) + assert expected_dir.exists() and expected_dir.is_dir() # Check it's still there. + finally: + tmp_dir.rmdir(recursive=True) - hello_txt.replace(with_user / "hello2.txt") - assert not hello_txt.exists() - assert (with_user / "hello2.txt").read_text() == "Hello, World!" +@pytest.mark.parametrize("cls", DATABRICKS_PATHLIKE) +def test_replace_file(ws, make_random, cls): + name = make_random() + tmp_dir = cls(ws, f"~/{name}").expanduser() + tmp_dir.mkdir() + try: + # Test replacing a file when the target doesn't exist. + src_file = tmp_dir / "src.txt" + src_file.write_text("Some content") + dst_file = src_file.replace(src_file.with_name("dst.txt")) + expected_file = tmp_dir / "dst.txt" + assert dst_file == expected_file and expected_file.is_file() + + # Test replacing a file when the target already exists. + exists_file = tmp_dir / "already-exists.txt" + exists_file.write_text("Existing file.") + replaced_file = dst_file.replace(exists_file) + assert replaced_file.is_file() and replaced_file.read_text() == "Some content" + finally: + tmp_dir.rmdir(recursive=True) def test_workspace_as_fuse(ws): - # WSFS and DBFS have different root paths wsp = WorkspacePath(ws, "/Users/foo/bar/baz") assert Path("/Workspace/Users/foo/bar/baz") == wsp.as_fuse() -def test_as_uri(ws): +def test_dbfs_as_fuse(ws): + p = DBFSPath(ws, "/Users/foo/bar/baz") + assert Path("/dbfs/Users/foo/bar/baz") == p.as_fuse() + + +def test_workspace_as_uri(ws): # DBFS is not exposed via browser wsp = WorkspacePath(ws, "/Users/foo/bar/baz") assert wsp.as_uri() == f"{ws.config.host}#workspace/Users/foo/bar/baz" diff --git a/tests/unit/test_paths.py b/tests/unit/test_paths.py index f618064..7a30b90 100644 --- a/tests/unit/test_paths.py +++ b/tests/unit/test_paths.py @@ -5,7 +5,7 @@ import pytest from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import NotFound +from databricks.sdk.errors import NotFound, ResourceDoesNotExist from databricks.sdk.mixins.workspace import WorkspaceExt from databricks.sdk.service.workspace import ( ImportFormat, @@ -97,7 +97,7 @@ def test_hash() -> None: ("foo", "foo/bar", "foo/baz"), ], ) -def test_comparison(increasing_paths: tuple[str | list[str], str | list[str], str | list[str]]) -> None: +def test_comparison(increasing_paths: tuple[str, str, str]) -> None: """Test that comparing paths works as expected.""" ws = create_autospec(WorkspaceClient) @@ -160,7 +160,7 @@ def test_pathlike_error() -> None: ws = create_autospec(WorkspaceClient) p = WorkspacePath(ws, "/some/path") - with pytest.raises(NotImplementedError, match="Workspace paths are not path-like"): + with pytest.raises(NotImplementedError, match="WorkspacePath paths are not path-like"): _ = os.fspath(p) @@ -424,10 +424,21 @@ def test_iterdir() -> None: def test_exists_when_path_exists() -> None: ws = create_autospec(WorkspaceClient) workspace_path = WorkspacePath(ws, "/test/path") - ws.workspace.get_status.return_value = True + ws.workspace.get_status.return_value = ObjectInfo(path="/test/path") assert workspace_path.exists() +def test_exists_caches_info() -> None: + ws = create_autospec(WorkspaceClient) + workspace_path = WorkspacePath(ws, "/test/path") + ws.workspace.get_status.return_value = ObjectInfo(path="/test/path", object_type=ObjectType.FILE) + _ = workspace_path.exists() + + ws.workspace.get_status.reset_mock() + _ = workspace_path.is_file() + assert not ws.workspace.get_status.called + + def test_exists_when_path_does_not_exist() -> None: ws = create_autospec(WorkspaceClient) workspace_path = WorkspacePath(ws, "/test/path") @@ -603,14 +614,15 @@ def test_replace_file() -> None: def test_unlink_existing_file() -> None: ws = create_autospec(WorkspaceClient) workspace_path = WorkspacePath(ws, "/test/path") - ws.workspace.get_status.return_value = True workspace_path.unlink() ws.workspace.delete.assert_called_once_with("/test/path") + assert not ws.workspace.get_status.called def test_unlink_non_existing_file() -> None: ws = create_autospec(WorkspaceClient) workspace_path = WorkspacePath(ws, "/test/path") + ws.workspace.delete.side_effect = ResourceDoesNotExist("Simulated ResourceDoesNotExist") ws.workspace.get_status.side_effect = NotFound("Simulated NotFound") with pytest.raises(FileNotFoundError): workspace_path.unlink()