From 8a9c8d405d7165d8914329caa03e4a6632e9ab46 Mon Sep 17 00:00:00 2001 From: Aris Tritas Date: Tue, 16 Apr 2024 15:42:21 +0200 Subject: [PATCH 1/2] Add node to backup index type alias --- astacus/coordinator/plugins/base.py | 16 +++++++++------- astacus/coordinator/plugins/m3db.py | 3 ++- tests/unit/coordinator/test_restore.py | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index bcb5a7b8..f3e971e2 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -17,7 +17,7 @@ from astacus.coordinator.manifest import download_backup_manifest, download_backup_min_manifest from collections import Counter from collections.abc import Sequence, Set -from typing import Any, Counter as TCounter, Generic, TypeVar +from typing import Any, Counter as TCounter, Generic, TypeAlias, TypeVar import dataclasses import datetime @@ -30,6 +30,8 @@ T = TypeVar("T") StepResult_co = TypeVar("StepResult_co", covariant=True) +NodeBackupIndices: TypeAlias = Sequence[int | None] + class CoordinatorPlugin(AstacusModel): def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: @@ -284,14 +286,14 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> ipc.BackupM @dataclasses.dataclass -class MapNodesStep(Step[Sequence[int | None]]): +class MapNodesStep(Step[NodeBackupIndices]): """ Create an index mapping nodes from cluster configuration to nodes in the backup manifest. """ partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode] | None = None - async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[int | None]: + async def run_step(self, cluster: Cluster, context: StepsContext) -> NodeBackupIndices: # AZ distribution should in theory be forced to match, but in # practise it doesn't really matter. So we restore nodes 'as # well as we can' and hope that is well enough (or whoever @@ -475,7 +477,7 @@ async def download_delta( *, nodes: Sequence[CoordinatorNode], cluster: Cluster, - node_to_backup_index: Sequence[int | None], + node_to_backup_index: NodeBackupIndices, delta_manifest: ipc.BackupManifest, ) -> None: reqs: list[ipc.NodeRequest] = [] @@ -520,7 +522,7 @@ async def clear_delta( *, nodes: Sequence[CoordinatorNode], cluster: Cluster, - node_to_backup_index: Sequence[int | None], + node_to_backup_index: NodeBackupIndices, delta_manifest: ipc.BackupManifest, ) -> None: reqs: list[ipc.NodeRequest] = [] @@ -706,7 +708,7 @@ def get_node_to_backup_index_from_partial_restore_nodes( partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode], snapshot_results: Sequence[ipc.SnapshotResult], nodes: Sequence[CoordinatorNode], -) -> Sequence[int | None]: +) -> NodeBackupIndices: node_to_backup_index: list[int | None] = [None] * len(nodes) hostname_to_backup_index: dict[str | None, int] = {} url_to_node_index: dict[str | None, int] = {} @@ -751,7 +753,7 @@ def get_node_to_backup_index_from_azs( nodes: Sequence[CoordinatorNode], azs_in_backup: TCounter[str], azs_in_nodes: TCounter[str], -) -> Sequence[int | None]: +) -> NodeBackupIndices: node_to_backup_index: list[int | None] = [None] * len(nodes) # This is strictly speaking just best-effort assignment for (backup_az, backup_n), (node_az, node_n) in zip(azs_in_backup.most_common(), azs_in_nodes.most_common()): diff --git a/astacus/coordinator/plugins/m3db.py b/astacus/coordinator/plugins/m3db.py index d1425632..913f0d45 100644 --- a/astacus/coordinator/plugins/m3db.py +++ b/astacus/coordinator/plugins/m3db.py @@ -14,6 +14,7 @@ CoordinatorPlugin, ListHexdigestsStep, MapNodesStep, + NodeBackupIndices, OperationContext, RestoreStep, SnapshotStep, @@ -191,7 +192,7 @@ def validate_m3_config(placement_nodes: Sequence[m3placement.M3PlacementNode], n def rewrite_m3db_placement( *, key: ETCDKey, - node_to_backup_index: Sequence[int | None], + node_to_backup_index: NodeBackupIndices, src_placement_nodes: Sequence[m3placement.M3PlacementNode], dst_placement_nodes: Sequence[m3placement.M3PlacementNode], nodes: Sequence[CoordinatorNode], diff --git a/tests/unit/coordinator/test_restore.py b/tests/unit/coordinator/test_restore.py index b50087a6..4db75fd5 100644 --- a/tests/unit/coordinator/test_restore.py +++ b/tests/unit/coordinator/test_restore.py @@ -9,7 +9,7 @@ from astacus.common.ipc import Plugin from astacus.common.rohmustorage import MultiRohmuStorage from astacus.coordinator.config import CoordinatorNode -from astacus.coordinator.plugins.base import get_node_to_backup_index +from astacus.coordinator.plugins.base import get_node_to_backup_index, NodeBackupIndices from collections.abc import Callable from contextlib import AbstractContextManager, nullcontext as does_not_raise from dataclasses import dataclass From 1884bde44495892b9996497e6775b30fbc0629dd Mon Sep 17 00:00:00 2001 From: Aris Tritas Date: Tue, 16 Apr 2024 15:56:02 +0200 Subject: [PATCH 2/2] Add option to ignore imbalanced AZs on node map step When restoring a cluster, the new nodes AZ distribution may not match the one stored in the backup manifest. In this case, the `MapNode` fails even though the new nodes may actually be better distributed now. The change allows mapping new nodes to their backup index in the presence of imperfect match between node AZs and backup AZs. A new exception is added to identify the imbalance error and catch it. The option to the step defaults to false so as to retain backwards compatibility with the previous behaviour. Ideally, a better solution would be to consider an AZ imbalance metric relative to some target replication factor, instead of naively remapping the backup indices. However, let's say this is good enough for the moment. --- astacus/common/exceptions.py | 4 + astacus/coordinator/plugins/base.py | 33 +++-- .../coordinator/plugins/cassandra/plugin.py | 2 +- tests/unit/coordinator/test_restore.py | 117 +++++++++++++++--- 4 files changed, 130 insertions(+), 26 deletions(-) diff --git a/astacus/common/exceptions.py b/astacus/common/exceptions.py index 261f919e..726b4353 100644 --- a/astacus/common/exceptions.py +++ b/astacus/common/exceptions.py @@ -24,6 +24,10 @@ class InsufficientAZsException(PermanentException): pass +class ImbalancedAZsException(PermanentException): + pass + + class NotFoundException(PermanentException): pass diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index f3e971e2..dd0e6936 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -292,6 +292,7 @@ class MapNodesStep(Step[NodeBackupIndices]): """ partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode] | None = None + ignore_imbalanced_azs: bool = False async def run_step(self, cluster: Cluster, context: StepsContext) -> NodeBackupIndices: # AZ distribution should in theory be forced to match, but in @@ -307,6 +308,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> NodeBackupI partial_restore_nodes=self.partial_restore_nodes, snapshot_results=snapshot_results, nodes=cluster.nodes, + ignore_imbalanced_azs=self.ignore_imbalanced_azs, ) @@ -676,7 +678,8 @@ def get_node_to_backup_index( partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode] | None, snapshot_results: Sequence[ipc.SnapshotResult], nodes: Sequence[CoordinatorNode], -) -> Sequence[int | None]: + ignore_imbalanced_azs: bool = False, +) -> NodeBackupIndices: if partial_restore_nodes: return get_node_to_backup_index_from_partial_restore_nodes( partial_restore_nodes=partial_restore_nodes, @@ -694,13 +697,25 @@ def get_node_to_backup_index( if len(azs_in_backup) > len(azs_in_nodes): azs_missing = len(azs_in_backup) - len(azs_in_nodes) raise exceptions.InsufficientAZsException(f"{azs_missing} az(s) missing - unable to restore backup") - - return get_node_to_backup_index_from_azs( - snapshot_results=snapshot_results, - nodes=nodes, - azs_in_backup=azs_in_backup, - azs_in_nodes=azs_in_nodes, - ) + try: + return get_node_to_backup_index_from_azs( + snapshot_results=snapshot_results, + nodes=nodes, + azs_in_backup=azs_in_backup, + azs_in_nodes=azs_in_nodes, + ) + except exceptions.ImbalancedAZsException: + if not ignore_imbalanced_azs: + raise + modified_snapshot_results = [ + msgspec.structs.replace(result, az=node.az) for result, node in zip(snapshot_results, nodes, strict=True) + ] + return get_node_to_backup_index_from_azs( + snapshot_results=modified_snapshot_results, + nodes=nodes, + azs_in_backup=azs_in_nodes, + azs_in_nodes=azs_in_nodes, + ) def get_node_to_backup_index_from_partial_restore_nodes( @@ -759,7 +774,7 @@ def get_node_to_backup_index_from_azs( for (backup_az, backup_n), (node_az, node_n) in zip(azs_in_backup.most_common(), azs_in_nodes.most_common()): if backup_n > node_n: missing_n = backup_n - node_n - raise exceptions.InsufficientNodesException( + raise exceptions.ImbalancedAZsException( f"AZ {node_az}, to be restored from {backup_az}, is missing {missing_n} nodes" ) diff --git a/astacus/coordinator/plugins/cassandra/plugin.py b/astacus/coordinator/plugins/cassandra/plugin.py index 220c1a05..e9f263c8 100644 --- a/astacus/coordinator/plugins/cassandra/plugin.py +++ b/astacus/coordinator/plugins/cassandra/plugin.py @@ -141,7 +141,7 @@ def get_restore_steps(self, *, context: OperationContext, req: ipc.RestoreReques base.BackupNameStep(json_storage=context.json_storage, requested_name=req.name), base.BackupManifestStep(json_storage=context.json_storage), restore_steps.ParsePluginManifestStep(), - base.MapNodesStep(partial_restore_nodes=req.partial_restore_nodes), + base.MapNodesStep(partial_restore_nodes=req.partial_restore_nodes, ignore_imbalanced_azs=True), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.stop_cassandra), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.unrestore_sstables), *cluster_restore_steps, diff --git a/tests/unit/coordinator/test_restore.py b/tests/unit/coordinator/test_restore.py index 4db75fd5..e6e6a9ef 100644 --- a/tests/unit/coordinator/test_restore.py +++ b/tests/unit/coordinator/test_restore.py @@ -162,28 +162,113 @@ def match_clear(request: httpx.Request) -> httpx.Response | None: assert app.state.coordinator_state.op_info.op_id == 1 +@dataclass(frozen=True) +class NodeToBackupCase: + node_azlist: list[str] + backup_azlist: list[str] + ignore_imbalanced_azs: bool + expected_index: NodeBackupIndices | None + exception: AbstractContextManager + name: str + + @pytest.mark.parametrize( - "node_azlist,backup_azlist,expected_index,exception", + "case", [ # successful cases - ([], [], [], does_not_raise()), - (["foo", "foo", "bar"], ["1", "2", "2"], [1, 2, 0], does_not_raise()), - (["a", "bb", "bb", "ccc", "ccc", "ccc"], ["1", "2", "2"], [None, 0, None, 1, 2, None], does_not_raise()), - (["a", "bb", "bb", "ccc", "ccc", "ccc"], ["3", "3", "3", "1", "2", "2"], [3, 4, 5, 0, 1, 2], does_not_raise()), + NodeToBackupCase( + node_azlist=[], + backup_azlist=[], + ignore_imbalanced_azs=False, + expected_index=[], + exception=does_not_raise(), + name="no-nodes", + ), + NodeToBackupCase( + node_azlist=["foo", "foo", "bar"], + backup_azlist=["1", "2", "2"], + ignore_imbalanced_azs=False, + expected_index=[1, 2, 0], + exception=does_not_raise(), + name="matching-distribution", + ), + NodeToBackupCase( + node_azlist=["a", "bb", "bb", "ccc", "ccc", "ccc"], + backup_azlist=["1", "2", "2"], + ignore_imbalanced_azs=False, + expected_index=[None, 0, None, 1, 2, None], + exception=does_not_raise(), + name="overcomplete-node-set", + ), + NodeToBackupCase( + node_azlist=["a", "bb", "bb", "ccc", "ccc", "ccc"], + backup_azlist=["3", "3", "3", "1", "2", "2"], + ignore_imbalanced_azs=False, + expected_index=[ + 3, + 4, + 5, + 0, + 1, + 2, + ], + exception=does_not_raise(), + name="matching-distribution-3azs", + ), # errors - (["foo", "foo"], ["1", "2", "2"], None, pytest.raises(exceptions.InsufficientNodesException)), - (["foo", "foo", "foo"], ["1", "2", "2"], None, pytest.raises(exceptions.InsufficientAZsException)), - (["foo", "foo", "bar", "bar"], ["1", "3", "3", "3"], None, pytest.raises(exceptions.InsufficientNodesException)), + NodeToBackupCase( + node_azlist=["foo", "foo"], + backup_azlist=["1", "2", "2"], + ignore_imbalanced_azs=False, + expected_index=None, + exception=pytest.raises(exceptions.InsufficientNodesException), + name="insufficient-nodes", + ), + NodeToBackupCase( + node_azlist=["foo", "foo", "foo"], + backup_azlist=["1", "2", "2"], + ignore_imbalanced_azs=False, + expected_index=None, + exception=pytest.raises(exceptions.InsufficientAZsException), + name="insufficient-azs", + ), + NodeToBackupCase( + node_azlist=["foo", "foo", "bar", "bar"], + backup_azlist=["1", "3", "3", "3"], + ignore_imbalanced_azs=False, + expected_index=None, + exception=pytest.raises(exceptions.ImbalancedAZsException), + name="imbalanced-azs", + ), + # ignored errors (insufficient nodes raise, imbalanced AZs doesn't) + NodeToBackupCase( + node_azlist=["foo", "foo"], + backup_azlist=["1", "2", "2"], + ignore_imbalanced_azs=True, + expected_index=None, + exception=pytest.raises(exceptions.InsufficientNodesException), + name="insufficient-nodes-not-ignored", + ), + NodeToBackupCase( + node_azlist=["foo", "foo", "bar", "bar"], + backup_azlist=["1", "3", "3", "3"], + ignore_imbalanced_azs=True, + expected_index=[0, 1, 2, 3], + exception=does_not_raise(), + name="imbalanced-azs-ignored", + ), ], + ids=lambda case: case.name, ) -def test_node_to_backup_index( - node_azlist: list[str], backup_azlist: list[str], expected_index: list[int], exception: AbstractContextManager -) -> None: - snapshot_results = [ipc.SnapshotResult(az=az) for az in backup_azlist] - nodes = [CoordinatorNode(url="unused", az=az) for az in node_azlist] - with exception: - assert expected_index == get_node_to_backup_index( - partial_restore_nodes=None, snapshot_results=snapshot_results, nodes=nodes +def test_node_to_backup_index(case: NodeToBackupCase) -> None: + snapshot_results = [ipc.SnapshotResult(az=az) for az in case.backup_azlist] + nodes = [CoordinatorNode(url="unused", az=az) for az in case.node_azlist] + with case.exception: + assert case.expected_index == get_node_to_backup_index( + partial_restore_nodes=None, + snapshot_results=snapshot_results, + nodes=nodes, + ignore_imbalanced_azs=case.ignore_imbalanced_azs, )