Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle deleted buckets gracefully #190

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rohmu/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ class InvalidConfigurationError(Error):
"""Invalid configuration"""


class TransferObjectStoreInitializationError(Error):
"""Raised when a transient network or permission issue does not allow us to validate access to the object store"""


class TransferObjectStorePermissionError(TransferObjectStoreInitializationError):
"""Raised when a permission issue does not allow us to validate access to the object store"""


class TransferObjectStoreMissingError(TransferObjectStoreInitializationError):
"""Raised when we know for sure the bucket is missing"""


class LocalFileIsRemoteFileError(StorageError):
"""File transfer operation source and destination point to the same file"""

Expand Down
12 changes: 8 additions & 4 deletions rohmu/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,23 @@ def get_transfer_model(storage_config: Config) -> StorageModel:
return storage_class.config_model(**storage_config)


def get_transfer(storage_config: Config) -> BaseTransfer[Any]:
def get_transfer(storage_config: Config, ensure_object_store_available: bool = True) -> BaseTransfer[Any]:
storage_config = storage_config.copy()
notifier_config = storage_config.pop("notifier", None)
notifier = None
if notifier_config is not None:
notifier = get_notifier(notifier_config)
model = get_transfer_model(storage_config)
return get_transfer_from_model(model, notifier)
return get_transfer_from_model(model, notifier, ensure_object_store_available=ensure_object_store_available)


def get_transfer_from_model(model: StorageModelT, notifier: Optional[Notifier] = None) -> BaseTransfer[StorageModelT]:
def get_transfer_from_model(
model: StorageModelT,
notifier: Optional[Notifier] = None,
ensure_object_store_available: bool = True,
) -> BaseTransfer[StorageModelT]:
storage_class = get_class_for_storage_driver(model.storage_type)
return storage_class.from_model(model, notifier)
return storage_class.from_model(model, notifier, ensure_object_store_available=ensure_object_store_available)


def _to_storage_driver(storage_type: str) -> StorageDriver:
Expand Down
96 changes: 80 additions & 16 deletions rohmu/object_storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from azure.core.exceptions import HttpResponseError, ResourceExistsError
from azure.storage.blob import BlobServiceClient, ContentSettings
from rohmu.common.statsd import StatsdConfig
from rohmu.errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError
from rohmu.errors import (
FileNotFoundFromStorageError,
InvalidConfigurationError,
StorageError,
TransferObjectStoreInitializationError,
TransferObjectStoreMissingError,
TransferObjectStorePermissionError,
)
from rohmu.notifier.interface import Notifier
from rohmu.object_storage.base import (
BaseTransfer,
Expand Down Expand Up @@ -61,15 +68,21 @@ def __init__(
proxy_info: Optional[dict[str, Union[str, int]]] = None,
notifier: Optional[Notifier] = None,
statsd_info: Optional[StatsdConfig] = None,
ensure_object_store_available: bool = True,
) -> None:
prefix = prefix.lstrip("/") if prefix else ""
super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info)
super().__init__(
prefix=prefix,
notifier=notifier,
statsd_info=statsd_info,
ensure_object_store_available=ensure_object_store_available,
)
if not account_key and not sas_token:
raise InvalidConfigurationError("One of account_key or sas_token must be specified to authenticate")

self.account_name = account_name
self.account_key = account_key
self.container_name = bucket_name
self.container = self.container_name = bucket_name
self.sas_token = sas_token
self._conn_str = self.conn_string(
account_name=account_name,
Expand All @@ -95,9 +108,34 @@ def __init__(
schema = "http"
self._config["proxies"] = {"https": f"{schema}://{auth}{proxy_host}:{proxy_port}"}
self._blob_service_client: Optional[BlobServiceClient] = None
self.container = self.get_or_create_container(self.container_name)
if ensure_object_store_available:
self._create_object_store_if_needed_unwrapped()
self.log.debug("AzureTransfer initialized, %r", self.container_name)

def _verify_object_storage_unwrapped(self) -> None:
self.get_or_create_container(self.container_name, create_if_needed=False)

def verify_object_storage(self) -> None:
try:
self._verify_object_storage_unwrapped()
except HttpResponseError as ex:
if ex.status_code == 403:
raise TransferObjectStorePermissionError() from ex
else:
raise TransferObjectStoreInitializationError() from ex

def _create_object_store_if_needed_unwrapped(self) -> None:
self.get_or_create_container(self.container_name, create_if_needed=True)

def create_object_store_if_needed(self) -> None:
try:
self._create_object_store_if_needed_unwrapped()
except HttpResponseError as ex:
if ex.status_code == 403:
raise TransferObjectStorePermissionError() from ex
else:
raise TransferObjectStoreInitializationError() from ex

def get_blob_service_client(self) -> BlobServiceClient:
if self._blob_service_client is None:
self._blob_service_client = BlobServiceClient.from_connection_string(
Expand Down Expand Up @@ -168,7 +206,16 @@ def _copy_file_from_bucket(
if time.monotonic() - start < timeout:
time.sleep(0.1)
else:
destination_client.abort_copy(copy_props.id, timeout=timeout)
if copy_props.id is None:
# The description of CopyProperties tells us copy_id should not be None unless status is also None
# The type checker cannot know this, but we still log a warning if this ever happens
self.log.warning(
"Pending copy operation from %r to %r was missing a copy_id, will not be aborted after timeout",
source_key,
destination_key,
)
else:
destination_client.abort_copy(copy_props.id, timeout=timeout)
raise StorageError(
f"Copying {repr(source_key)} to {repr(destination_key)} did not complete in {timeout} seconds"
)
Expand Down Expand Up @@ -402,21 +449,38 @@ def progress_callback(pipeline_response: Any) -> None:
else:
delattr(fd, "tell")

def get_or_create_container(self, container_name: str) -> str:
def get_or_create_container(self, container_name: str, create_if_needed: bool = True) -> str:
if isinstance(container_name, enum.Enum):
# ensure that the enum value is used rather than the enum name
# https://github.com/Azure/azure-sdk-for-python/blob/azure-storage-blob_12.8.1/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_service_client.py#L667
container_name = container_name.value
start_time = time.monotonic()
try:
self.get_blob_service_client().create_container(container_name)
except ResourceExistsError:
pass
except HttpResponseError as e:
if "request is not authorized" in e.exc_msg:
self.log.debug("Container creation unauthorized. Assuming container %r already exists", container_name)
return container_name
else:
raise e
if create_if_needed:
try:
self.get_blob_service_client().create_container(container_name)
except ResourceExistsError:
pass
except HttpResponseError as e:
if "request is not authorized" in e.exc_msg:
# Corresponds to a 403/AuthorizationFailure
self.log.debug("Container creation unauthorized. Assuming container %r already exists", container_name)
return container_name
else:
raise e
else:
try:
container_exists = self.get_blob_service_client().get_container_client(container_name).exists()
except HttpResponseError as e:
if e.status_code == 403:
# We have to handle 403/AuthorizationFailure gracefully as container-scoped SAS tokens might have
# all permissions inside the container but won't have permissions on the container itself and we cannot
# assume we'd have an account-scoped SAS token or account key here
self.log.debug("Container metadata unauthorized. Assuming container %r already exists", container_name)
return container_name
raise
if not container_exists:
self.log.debug("Container %r does not exist, creation not requested", container_name)
raise TransferObjectStoreMissingError()
self.log.debug("Container %r already exists", container_name)
self.log.debug("Got/Created container: %r successfully, took: %.3fs", container_name, time.monotonic() - start_time)
return container_name
62 changes: 58 additions & 4 deletions rohmu/object_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from io import BytesIO
from rohmu.common.models import StorageModel
from rohmu.common.statsd import StatsClient, StatsdConfig
from rohmu.errors import FileNotFoundFromStorageError, InvalidByteRangeError, StorageError
from rohmu.errors import (
FileNotFoundFromStorageError,
InvalidByteRangeError,
StorageError,
)
from rohmu.notifier.interface import Notifier
from rohmu.notifier.null import NullNotifier
from rohmu.object_storage.config import StorageModelT
Expand Down Expand Up @@ -71,8 +75,18 @@ class BaseTransfer(Generic[StorageModelT]):
supports_concurrent_upload: bool = False

def __init__(
self, prefix: Optional[str], notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None
self,
prefix: Optional[str],
notifier: Optional[Notifier] = None,
statsd_info: Optional[StatsdConfig] = None,
ensure_object_store_available: bool = True,
) -> None:
"""
Initialize a Transfer instance. Although it's possible to do implementation-specific IO here, it's preferable to
delegate to verify_object_storage and create_object_store_if_needed.

The ensure_object_store_available flag is here to allow this transition in a backwards-compatible manner.
"""
self.log = logging.getLogger(self.__class__.__name__)
if not prefix:
prefix = ""
Expand All @@ -82,6 +96,40 @@ def __init__(
self.notifier = notifier or NullNotifier()
self.stats = StatsClient(statsd_info)

def _verify_object_storage_unwrapped(self) -> None:
"""
Perform read-only operations to verify the backing object store is available and accessible.

Raises the implementation-specific exception as-is. This is mainly useful for backwards-compatibility and should
never be called directly.
"""
raise NotImplementedError

def verify_object_storage(self) -> None:
"""
Perform read-only operations to verify the backing object store is available and accessible.

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.
"""
raise NotImplementedError

def _create_object_store_if_needed_unwrapped(self) -> None:
"""
Create the backing object store if it's needed (e.g. creating directories, buckets, etc.).

Raises the implementation-specific exception as-is. This is mainly useful for backwards-compatibility and should
never be called directly.
"""
raise NotImplementedError

def create_object_store_if_needed(self) -> None:
"""
Create the backing object store if it's needed (e.g. creating directories, buckets, etc.).

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.
"""
raise NotImplementedError

def close(self) -> None:
"""Release all resources associated with the Transfer object."""
pass
Expand Down Expand Up @@ -138,8 +186,14 @@ def _should_multipart(
return int(size) > chunk_size

@classmethod
def from_model(cls, model: StorageModelT, notifier: Optional[Notifier] = None) -> Self:
return cls(**model.dict(by_alias=True, exclude={"storage_type"}), notifier=notifier)
def from_model(
cls, model: StorageModelT, notifier: Optional[Notifier] = None, ensure_object_store_available: bool = True
) -> Self:
return cls(
**model.dict(by_alias=True, exclude={"storage_type"}),
notifier=notifier,
ensure_object_store_available=ensure_object_store_available,
)

def copy_file(
self, *, source_key: str, destination_key: str, metadata: Optional[Metadata] = None, **_kwargs: Any
Expand Down
Loading
Loading