Skip to content

Commit

Permalink
rohmu: add progress callback for transferred object keys
Browse files Browse the repository at this point in the history
Added callback progress_fn to copy_files_from
which tracks the progress of transferred objects.

This does not measure the number of bytes
in order to avoid a provider based implementation.
  • Loading branch information
tilman-aiven committed Aug 21, 2024
1 parent b6b332b commit 9a596fc
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
15 changes: 13 additions & 2 deletions rohmu/object_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
TypeVar,
Union,
)
from typing_extensions import Self
from typing_extensions import Self, TypeAlias

import logging
import os
Expand All @@ -55,6 +55,9 @@ class IterKeyItem(NamedTuple):
# Argument is the additional number of bytes transferred
IncrementalProgressCallbackType = Optional[Callable[[int], None]]

# Argument is the transferred object key
ObjectTransferProgressCallbackType: TypeAlias = Optional[Callable[[str], None]]


@dataclass(frozen=True, unsafe_hash=True)
class ConcurrentUpload:
Expand Down Expand Up @@ -202,10 +205,18 @@ def copy_file(
cannot be copied with this method. If no metadata is given copies the existing metadata."""
raise NotImplementedError

def copy_files_from(self, *, source: BaseTransfer[SourceStorageModelT], keys: Collection[str]) -> None:
def copy_files_from(
self,
*,
source: BaseTransfer[SourceStorageModelT],
keys: Collection[str],
progress_fn: ObjectTransferProgressCallbackType = None,
) -> None:
if isinstance(source, self.__class__):
for key in keys:
self._copy_file_from_bucket(source_bucket=source, source_key=key, destination_key=key, timeout=15)
if progress_fn is not None:
progress_fn(key)
else:
raise NotImplementedError

Expand Down
9 changes: 5 additions & 4 deletions test/object_storage/test_object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from rohmu import errors
from rohmu.object_storage.local import LocalTransfer
from typing import Any
from unittest import mock
from unittest.mock import MagicMock

import pytest

Expand Down Expand Up @@ -72,15 +74,14 @@ def test_copy(transfer_type: str, request: Any) -> None:
def test_copy_local_files_from(tmp_path: Path) -> None:
source = LocalTransfer(tmp_path / "source", prefix="s-prefix")
destination = LocalTransfer(tmp_path / "destination", prefix="d-prefix")
mock_progress_fn = MagicMock(return_value=None)

source.store_file_from_memory("some/a/key.ext", b"content_a", metadata={"info": "aaa"})
source.store_file_from_memory("some/b/key.ext", b"content_b", metadata={"info": "bbb"})
destination.copy_files_from(
source=source,
keys=["some/a/key.ext", "some/b/key.ext"],
)
destination.copy_files_from(source=source, keys=["some/a/key.ext", "some/b/key.ext"], progress_fn=mock_progress_fn)
assert destination.get_contents_to_string("some/a/key.ext") == (b"content_a", {"info": "aaa", "Content-Length": "9"})
assert destination.get_contents_to_string("some/b/key.ext") == (b"content_b", {"info": "bbb", "Content-Length": "9"})
assert mock_progress_fn.call_args_list == [mock.call("some/a/key.ext"), mock.call("some/b/key.ext")]


@pytest.mark.parametrize("transfer_type", ["local_transfer"])
Expand Down

0 comments on commit 9a596fc

Please sign in to comment.