Skip to content

Commit

Permalink
rohmu: extract copy_files_from to super class BaseTransfer
Browse files Browse the repository at this point in the history
This allows us to hook the key iteration
without an override for each provider transfer.
  • Loading branch information
tilman-aiven committed Aug 21, 2024
1 parent 6b695f1 commit b6b332b
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 40 deletions.
14 changes: 4 additions & 10 deletions rohmu/object_storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
KEY_TYPE_OBJECT,
KEY_TYPE_PREFIX,
ProgressProportionCallbackType,
SourceStorageModelT,
)
from rohmu.object_storage.config import ( # noqa: F401
AZURE_ENDPOINT_SUFFIXES as ENDPOINT_SUFFIXES,
Expand All @@ -33,7 +32,8 @@
calculate_azure_max_block_size as calculate_max_block_size,
)
from rohmu.typing import Metadata
from typing import Any, BinaryIO, Collection, Iterator, Optional, Tuple, Union
from typing import Any, BinaryIO, Iterator, Optional, Tuple, Union
from typing_extensions import Self

import azure.common
import enum
Expand Down Expand Up @@ -182,7 +182,8 @@ def copy_file(

def _copy_file_from_bucket(
self,
source_bucket: AzureTransfer,
*,
source_bucket: Self,
source_key: str,
destination_key: str,
metadata: Optional[Metadata] = None,
Expand Down Expand Up @@ -228,13 +229,6 @@ def _copy_file_from_bucket(
f"Copying {repr(source_key)} to {repr(destination_key)} failed, unexpected status: {copy_props.status}"
)

def copy_files_from(self, *, source: BaseTransfer[SourceStorageModelT], keys: Collection[str]) -> None:
if isinstance(source, AzureTransfer):
for key in keys:
self._copy_file_from_bucket(source_bucket=source, source_key=key, destination_key=key, timeout=15)
else:
raise NotImplementedError

def get_metadata_for_key(self, key: str) -> Metadata:
path = self.format_key_for_backend(key, remove_slash_prefix=True, trailing_slash=False)
items = list(self._iter_key(path=path, with_metadata=True, deep=False))
Expand Down
15 changes: 15 additions & 0 deletions rohmu/object_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,21 @@ def copy_file(
raise NotImplementedError

def copy_files_from(self, *, source: BaseTransfer[SourceStorageModelT], keys: Collection[str]) -> None:
if isinstance(source, self.__class__):
for key in keys:
self._copy_file_from_bucket(source_bucket=source, source_key=key, destination_key=key, timeout=15)
else:
raise NotImplementedError

def _copy_file_from_bucket(
self,
*,
source_bucket: Self,
source_key: str,
destination_key: str,
metadata: Optional[Metadata] = None,
timeout: float = 15.0,
) -> None:
raise NotImplementedError

def format_key_for_backend(self, key: str, remove_slash_prefix: bool = False, trailing_slash: bool = False) -> str:
Expand Down
19 changes: 8 additions & 11 deletions rohmu/object_storage/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
KEY_TYPE_OBJECT,
KEY_TYPE_PREFIX,
ProgressProportionCallbackType,
SourceStorageModelT,
)
from rohmu.object_storage.config import (
GOOGLE_DOWNLOAD_CHUNK_SIZE as DOWNLOAD_CHUNK_SIZE,
Expand All @@ -52,7 +51,6 @@
BinaryIO,
Callable,
cast,
Collection,
Iterable,
Iterator,
Optional,
Expand All @@ -62,7 +60,7 @@
TypeVar,
Union,
)
from typing_extensions import Protocol
from typing_extensions import Protocol, Self

import codecs
import dataclasses
Expand Down Expand Up @@ -349,7 +347,13 @@ def copy_file(
)

def _copy_file_from_bucket(
self, *, source_bucket: GoogleTransfer, source_key: str, destination_key: str, metadata: Optional[Metadata] = None
self,
*,
source_bucket: Self,
source_key: str,
destination_key: str,
metadata: Optional[Metadata] = None,
timeout: float = 15.0,
) -> None:
source_object = source_bucket.format_key_for_backend(source_key)
destination_object = self.format_key_for_backend(destination_key)
Expand All @@ -374,13 +378,6 @@ def _copy_file_from_bucket(
self.notifier.object_copied(key=destination_key, size=size, metadata=metadata)
reporter.report(self.stats)

def copy_files_from(self, *, source: BaseTransfer[SourceStorageModelT], keys: Collection[str]) -> None:
if isinstance(source, GoogleTransfer):
for key in keys:
self._copy_file_from_bucket(source_bucket=source, source_key=key, destination_key=key)
else:
raise NotImplementedError

def get_metadata_for_key(self, key: str) -> Metadata:
path = self.format_key_for_backend(key)
with self._object_client(not_found=path) as clob:
Expand Down
19 changes: 9 additions & 10 deletions rohmu/object_storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
KEY_TYPE_OBJECT,
KEY_TYPE_PREFIX,
ProgressProportionCallbackType,
SourceStorageModelT,
)
from rohmu.object_storage.config import LOCAL_CHUNK_SIZE as CHUNK_SIZE, LocalObjectStorageConfig as Config
from rohmu.typing import Metadata
from rohmu.util import BinaryStreamsConcatenation, ProgressStream
from typing import Any, BinaryIO, Collection, Iterator, Optional, TextIO, Tuple, Union
from typing import Any, BinaryIO, Iterator, Optional, TextIO, Tuple, Union
from typing_extensions import Self

import contextlib
import datetime
Expand Down Expand Up @@ -81,7 +81,13 @@ def copy_file(
)

def _copy_file_from_bucket(
self, *, source_bucket: LocalTransfer, source_key: str, destination_key: str, metadata: Optional[Metadata] = None
self,
*,
source_bucket: Self,
source_key: str,
destination_key: str,
metadata: Optional[Metadata] = None,
timeout: float = 15.0,
) -> None:
source_path = source_bucket.format_key_for_backend(source_key.strip("/"))
destination_path = self.format_key_for_backend(destination_key.strip("/"))
Expand All @@ -97,13 +103,6 @@ def _copy_file_from_bucket(
self._save_metadata(destination_path, new_metadata)
self.notifier.object_copied(key=destination_key, size=os.path.getsize(destination_path), metadata=metadata)

def copy_files_from(self, *, source: BaseTransfer[SourceStorageModelT], keys: Collection[str]) -> None:
if isinstance(source, LocalTransfer):
for key in keys:
self._copy_file_from_bucket(source_bucket=source, source_key=key, destination_key=key)
else:
raise NotImplementedError

def _get_metadata_for_key(self, key: str) -> Metadata:
source_path = self.format_key_for_backend(key.strip("/"))
if not os.path.exists(source_path):
Expand Down
17 changes: 8 additions & 9 deletions rohmu/object_storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
KEY_TYPE_OBJECT,
KEY_TYPE_PREFIX,
ProgressProportionCallbackType,
SourceStorageModelT,
)
from rohmu.object_storage.config import ( # noqa: F401
calculate_s3_chunk_size as calculate_chunk_size,
Expand All @@ -42,6 +41,7 @@
from rohmu.util import batched, ProgressStream
from threading import RLock
from typing import Any, BinaryIO, cast, Collection, Iterator, Optional, Tuple, TYPE_CHECKING, Union
from typing_extensions import Self

import botocore.client
import botocore.config
Expand Down Expand Up @@ -284,7 +284,13 @@ def copy_file(
)

def _copy_file_from_bucket(
self, *, source_bucket: S3Transfer, source_key: str, destination_key: str, metadata: Optional[Metadata] = None
self,
*,
source_bucket: Self,
source_key: str,
destination_key: str,
metadata: Optional[Metadata] = None,
timeout: float = 15.0,
) -> None:
source_path = (
source_bucket.bucket_name + "/" + source_bucket.format_key_for_backend(source_key, remove_slash_prefix=True)
Expand All @@ -307,13 +313,6 @@ def _copy_file_from_bucket(
else:
raise StorageError(f"Copying {source_key!r} to {destination_key!r} failed: {ex!r}") from ex

def copy_files_from(self, *, source: BaseTransfer[SourceStorageModelT], keys: Collection[str]) -> None:
if isinstance(source, S3Transfer):
for key in keys:
self._copy_file_from_bucket(source_bucket=source, source_key=key, destination_key=key)
else:
raise NotImplementedError

def get_metadata_for_key(self, key: str) -> Metadata:
path = self.format_key_for_backend(key, remove_slash_prefix=True)
return self._metadata_for_key(path)
Expand Down

0 comments on commit b6b332b

Please sign in to comment.