Skip to content

Commit

Permalink
clickhouse: add the total size on object storage to backup list
Browse files Browse the repository at this point in the history
Our backup sizes do not contain object storage size.  Add this to the
list result.
[DDB-1222]
  • Loading branch information
joelynch committed Sep 10, 2024
1 parent 53a6aa9 commit 1fd3bd7
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
3 changes: 3 additions & 0 deletions astacus/common/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Copyright (c) 2020 Aiven Ltd
See LICENSE for details
"""

from .magic import DEFAULT_EMBEDDED_FILE_SIZE, StrEnum
from .progress import Progress
from .utils import now, SizeLimitedFile
Expand Down Expand Up @@ -356,6 +357,8 @@ class ListSingleBackup(msgspec.Struct, frozen=True, kw_only=True):
# using the files' hexdigest as key. As such, they are *not* sourced from BackupManifest.
cluster_files: int
cluster_data_size: int
# Total size in bytes of files on object storage
object_storage_size: int | None = None


class ListForStorage(msgspec.Struct, kw_only=True):
Expand Down
20 changes: 20 additions & 0 deletions astacus/coordinator/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
See LICENSE for details
"""

from astacus.common import ipc, magic
from astacus.common.storage import JsonStorage
from astacus.coordinator.plugins.clickhouse.manifest import ClickHouseManifest
from astacus.coordinator.storage_factory import StorageFactory
from collections import defaultdict
from collections.abc import Iterator, Mapping
Expand Down Expand Up @@ -39,6 +41,22 @@ def compute_deduplicated_snapshot_file_stats(manifest: ipc.BackupManifest) -> tu
return num_files, total_size


def get_object_storage_size_bytes(manifest: ipc.BackupManifest) -> int | None:
"""Compute the total size of object storage files for the backup in bytes."""
if manifest.plugin == ipc.Plugin.clickhouse:
clickhouse_data = ClickHouseManifest.from_plugin_data(manifest.plugin_data)
# If any of the total_size_bytes is None, return None (meaning the
# manifest is an older version and we don't have size information)
# In practice all will be None or all will be set.
total = 0
for files in clickhouse_data.object_storage_files:
if files.total_size_bytes is None:
return None
total += files.total_size_bytes
return total
return None


def _iter_backups(
storage: JsonStorage, backup_prefix: str, storage_cache: CachedStorageListEntries
) -> Iterator[ipc.ListSingleBackup]:
Expand All @@ -56,6 +74,7 @@ def _iter_backups(
upload_size = sum(x.total_size for x in manifest.upload_results)
upload_stored_size = sum(x.total_stored_size for x in manifest.upload_results)
cluster_files, cluster_data_size = compute_deduplicated_snapshot_file_stats(manifest)
object_storage_size = get_object_storage_size_bytes(manifest)
yield ipc.ListSingleBackup(
name=backup_name,
start=manifest.start,
Expand All @@ -69,6 +88,7 @@ def _iter_backups(
cluster_data_size=cluster_data_size,
upload_size=upload_size,
upload_stored_size=upload_stored_size,
object_storage_size=object_storage_size,
)


Expand Down
2 changes: 2 additions & 0 deletions astacus/coordinator/plugins/clickhouse/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ def from_plugin_data(cls, data: dict[str, Any]) -> Self:
class ClickHouseObjectStorageFiles(AstacusModel):
disk_name: str
files: list[ClickHouseObjectStorageFile]
total_size_bytes: int | None = None

@classmethod
def from_plugin_data(cls, data: dict[str, Any]) -> Self:
return cls(
disk_name=data["disk_name"],
files=[ClickHouseObjectStorageFile.from_plugin_data(item) for item in data["files"]],
total_size_bytes=data.get("total_size_bytes"),
)


Expand Down
6 changes: 5 additions & 1 deletion astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class CollectObjectStorageFilesStep(Step[list[ClickHouseObjectStorageFiles]]):
async def run_step(self, cluster: Cluster, context: StepsContext) -> list[ClickHouseObjectStorageFiles]:
snapshot_results: Sequence[ipc.SnapshotResult] = context.get_result(SnapshotStep)
object_storage_files: dict[str, set[str]] = {}
total_size_bytes = 0
for snapshot_result in snapshot_results:
assert snapshot_result.state is not None
for snapshot_file in snapshot_result.state.files:
Expand All @@ -287,9 +288,12 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> list[ClickH
raise StepFailedError(f"Invalid file metadata in {snapshot_file.relative_path}: {e}") from e
for object_metadata in file_metadata.objects:
object_storage_files.setdefault(parsed_path.disk.name, set()).add(object_metadata.relative_path)
total_size_bytes += object_metadata.size_bytes
return [
ClickHouseObjectStorageFiles(
disk_name=disk_name, files=[ClickHouseObjectStorageFile(path=path) for path in sorted(paths)]
disk_name=disk_name,
files=[ClickHouseObjectStorageFile(path=path) for path in sorted(paths)],
total_size_bytes=total_size_bytes,
)
for disk_name, paths in sorted(object_storage_files.items())
]
Expand Down
17 changes: 11 additions & 6 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,15 @@
),
]

SAMPLE_OBJET_STORAGE_FILES = [
SAMPLE_OBJECT_STORAGE_FILES = [
ClickHouseObjectStorageFiles(
disk_name="remote",
files=[
ClickHouseObjectStorageFile(path="abc/defghi"),
ClickHouseObjectStorageFile(path="jkl/mnopqr"),
ClickHouseObjectStorageFile(path="stu/vwxyza"),
],
total_size_bytes=400,
)
]

Expand All @@ -164,7 +165,7 @@
access_entities=SAMPLE_ENTITIES,
replicated_databases=SAMPLE_DATABASES,
tables=SAMPLE_TABLES,
object_storage_files=SAMPLE_OBJET_STORAGE_FILES,
object_storage_files=SAMPLE_OBJECT_STORAGE_FILES,
user_defined_functions=SAMPLE_USER_DEFINED_FUNCTIONS,
)

Expand Down Expand Up @@ -444,8 +445,8 @@ async def test_retrieve_macros() -> None:
]


def create_remote_file(path: str, remote_path: str) -> SnapshotFile:
metadata = f"""3\n1\t100\n100\t{remote_path}\n1\n0\n""".encode()
def create_remote_file(path: str, remote_path: str, size: int) -> SnapshotFile:
metadata = f"""3\n1\t{size}\n{size}\t{remote_path}\n1\n0\n""".encode()
return SnapshotFile(
relative_path=f"disks/remote/{path}",
file_size=len(metadata),
Expand All @@ -472,10 +473,12 @@ async def test_collect_object_storage_file_steps() -> None:
create_remote_file(
f"store/{table_uuid_parts}/all_0_0_0/columns.txt",
"abc/defghi",
size=100,
),
create_remote_file(
f"store/{table_uuid_parts}/all_0_0_0/data.bin",
"jkl/mnopqr",
size=100,
),
],
)
Expand All @@ -487,17 +490,19 @@ async def test_collect_object_storage_file_steps() -> None:
create_remote_file(
f"store/{table_uuid_parts}/all_0_0_0/columns.txt",
"abc/defghi",
size=0,
),
create_remote_file(
f"store/{table_uuid_parts}/all_0_0_0/data.bin",
"stu/vwxyza",
size=200,
),
],
)
),
]
context.set_result(SnapshotStep, snapshot_results)
assert await step.run_step(Cluster(nodes=[]), context) == SAMPLE_OBJET_STORAGE_FILES
assert await step.run_step(Cluster(nodes=[]), context) == SAMPLE_OBJECT_STORAGE_FILES


async def test_move_frozen_parts_steps() -> None:
Expand Down Expand Up @@ -554,7 +559,7 @@ async def test_create_clickhouse_manifest() -> None:
context = StepsContext()
context.set_result(RetrieveAccessEntitiesStep, SAMPLE_ENTITIES)
context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, SAMPLE_TABLES))
context.set_result(CollectObjectStorageFilesStep, SAMPLE_OBJET_STORAGE_FILES)
context.set_result(CollectObjectStorageFilesStep, SAMPLE_OBJECT_STORAGE_FILES)
context.set_result(RetrieveUserDefinedFunctionsStep, SAMPLE_USER_DEFINED_FUNCTIONS)
assert await step.run_step(Cluster(nodes=[]), context) == SAMPLE_MANIFEST_ENCODED

Expand Down
51 changes: 48 additions & 3 deletions tests/unit/coordinator/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Test that the list endpoint behaves as advertised
"""

from astacus.common import ipc
from astacus.common.ipc import (
BackupManifest,
ListForStorage,
Expand All @@ -19,7 +21,13 @@
)
from astacus.coordinator import api
from astacus.coordinator.api import get_cache_entries_from_list_response
from astacus.coordinator.list import compute_deduplicated_snapshot_file_stats, list_backups
from astacus.coordinator.list import compute_deduplicated_snapshot_file_stats, get_object_storage_size_bytes, list_backups
from astacus.coordinator.plugins.clickhouse.manifest import (
ClickHouseBackupVersion,
ClickHouseManifest,
ClickHouseObjectStorageFile,
ClickHouseObjectStorageFiles,
)
from astacus.coordinator.storage_factory import StorageFactory
from fastapi.testclient import TestClient
from pathlib import Path
Expand Down Expand Up @@ -53,6 +61,7 @@ def _run():
"total_size": 6,
"upload_size": 6,
"upload_stored_size": 10,
"object_storage_size": None,
},
{
"attempt": 1,
Expand All @@ -67,6 +76,7 @@ def _run():
"total_size": 6,
"upload_size": 6,
"upload_stored_size": 10,
"object_storage_size": None,
},
],
"storage_name": "x",
Expand All @@ -86,6 +96,7 @@ def _run():
"total_size": 6,
"upload_size": 6,
"upload_stored_size": 10,
"object_storage_size": None,
}
],
"storage_name": "y",
Expand Down Expand Up @@ -210,7 +221,7 @@ def fixture_backup_manifest() -> BackupManifest:
SnapshotUploadResult(total_size=4000, total_stored_size=3000),
SnapshotUploadResult(total_size=5000, total_stored_size=4000),
],
plugin=Plugin.clickhouse,
plugin=Plugin.cassandra,
)


Expand Down Expand Up @@ -239,7 +250,7 @@ def test_api_list_deduplication(backup_manifest: BackupManifest, tmp_path: Path)
name="1",
start=datetime.datetime(2020, 1, 2, 3, 4, 5, 678, tzinfo=datetime.timezone.utc),
end=datetime.datetime(2020, 1, 2, 5, 6, 7, 891, tzinfo=datetime.timezone.utc),
plugin=Plugin("clickhouse"),
plugin=Plugin.cassandra,
attempt=1,
nodes=2,
cluster_files=6,
Expand All @@ -248,6 +259,7 @@ def test_api_list_deduplication(backup_manifest: BackupManifest, tmp_path: Path)
total_size=9000,
upload_size=9000,
upload_stored_size=7000,
object_storage_size=None,
)
],
),
Expand Down Expand Up @@ -326,3 +338,36 @@ def create_backup(name: str) -> ListSingleBackup:
upload_size=9000,
upload_stored_size=7000,
)


def test_list_object_storage_size(backup_manifest: ipc.BackupManifest) -> None:
manifest = BackupManifest(
start=datetime.datetime(2020, 1, 2, 3, 4, 5, 678, tzinfo=datetime.timezone.utc),
end=datetime.datetime(2020, 1, 2, 5, 6, 7, 891, tzinfo=datetime.timezone.utc),
attempt=1,
snapshot_results=[],
upload_results=[],
plugin=Plugin.clickhouse,
plugin_data=ClickHouseManifest(
version=ClickHouseBackupVersion.V2,
object_storage_files=[
ClickHouseObjectStorageFiles(
disk_name="disk1",
files=[
ClickHouseObjectStorageFile(path="path1"),
ClickHouseObjectStorageFile(path="path2"),
],
total_size_bytes=1000,
),
ClickHouseObjectStorageFiles(
disk_name="disk2",
files=[
ClickHouseObjectStorageFile(path="path3"),
ClickHouseObjectStorageFile(path="path4"),
],
total_size_bytes=2000,
),
],
).dict(),
)
assert get_object_storage_size_bytes(manifest) == 3000

0 comments on commit 1fd3bd7

Please sign in to comment.