Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally ignore imbalanced AZs on MapNode step #215

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions astacus/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class InsufficientAZsException(PermanentException):
pass


class ImbalancedAZsException(PermanentException):
pass


class NotFoundException(PermanentException):
pass

Expand Down
49 changes: 33 additions & 16 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from astacus.coordinator.manifest import download_backup_manifest, download_backup_min_manifest
from collections import Counter
from collections.abc import Sequence, Set
from typing import Any, Counter as TCounter, Generic, TypeVar
from typing import Any, Counter as TCounter, Generic, TypeAlias, TypeVar

import dataclasses
import datetime
Expand All @@ -30,6 +30,8 @@
T = TypeVar("T")
StepResult_co = TypeVar("StepResult_co", covariant=True)

NodeBackupIndices: TypeAlias = Sequence[int | None]


class CoordinatorPlugin(AstacusModel):
def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
Expand Down Expand Up @@ -284,14 +286,15 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> ipc.BackupM


@dataclasses.dataclass
class MapNodesStep(Step[Sequence[int | None]]):
class MapNodesStep(Step[NodeBackupIndices]):
"""
Create an index mapping nodes from cluster configuration to nodes in the backup manifest.
"""

partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode] | None = None
ignore_imbalanced_azs: bool = False

async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[int | None]:
async def run_step(self, cluster: Cluster, context: StepsContext) -> NodeBackupIndices:
# AZ distribution should in theory be forced to match, but in
# practise it doesn't really matter. So we restore nodes 'as
# well as we can' and hope that is well enough (or whoever
Expand All @@ -305,6 +308,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[in
partial_restore_nodes=self.partial_restore_nodes,
snapshot_results=snapshot_results,
nodes=cluster.nodes,
ignore_imbalanced_azs=self.ignore_imbalanced_azs,
)


Expand Down Expand Up @@ -475,7 +479,7 @@ async def download_delta(
*,
nodes: Sequence[CoordinatorNode],
cluster: Cluster,
node_to_backup_index: Sequence[int | None],
node_to_backup_index: NodeBackupIndices,
delta_manifest: ipc.BackupManifest,
) -> None:
reqs: list[ipc.NodeRequest] = []
Expand Down Expand Up @@ -520,7 +524,7 @@ async def clear_delta(
*,
nodes: Sequence[CoordinatorNode],
cluster: Cluster,
node_to_backup_index: Sequence[int | None],
node_to_backup_index: NodeBackupIndices,
delta_manifest: ipc.BackupManifest,
) -> None:
reqs: list[ipc.NodeRequest] = []
Expand Down Expand Up @@ -674,7 +678,8 @@ def get_node_to_backup_index(
partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode] | None,
snapshot_results: Sequence[ipc.SnapshotResult],
nodes: Sequence[CoordinatorNode],
) -> Sequence[int | None]:
ignore_imbalanced_azs: bool = False,
) -> NodeBackupIndices:
if partial_restore_nodes:
return get_node_to_backup_index_from_partial_restore_nodes(
partial_restore_nodes=partial_restore_nodes,
Expand All @@ -692,21 +697,33 @@ def get_node_to_backup_index(
if len(azs_in_backup) > len(azs_in_nodes):
azs_missing = len(azs_in_backup) - len(azs_in_nodes)
raise exceptions.InsufficientAZsException(f"{azs_missing} az(s) missing - unable to restore backup")

return get_node_to_backup_index_from_azs(
snapshot_results=snapshot_results,
nodes=nodes,
azs_in_backup=azs_in_backup,
azs_in_nodes=azs_in_nodes,
)
try:
return get_node_to_backup_index_from_azs(
snapshot_results=snapshot_results,
nodes=nodes,
azs_in_backup=azs_in_backup,
azs_in_nodes=azs_in_nodes,
)
except exceptions.ImbalancedAZsException:
if not ignore_imbalanced_azs:
raise
modified_snapshot_results = [
msgspec.structs.replace(result, az=node.az) for result, node in zip(snapshot_results, nodes, strict=True)
]
return get_node_to_backup_index_from_azs(
snapshot_results=modified_snapshot_results,
nodes=nodes,
azs_in_backup=azs_in_nodes,
azs_in_nodes=azs_in_nodes,
)


def get_node_to_backup_index_from_partial_restore_nodes(
*,
partial_restore_nodes: Sequence[ipc.PartialRestoreRequestNode],
snapshot_results: Sequence[ipc.SnapshotResult],
nodes: Sequence[CoordinatorNode],
) -> Sequence[int | None]:
) -> NodeBackupIndices:
node_to_backup_index: list[int | None] = [None] * len(nodes)
hostname_to_backup_index: dict[str | None, int] = {}
url_to_node_index: dict[str | None, int] = {}
Expand Down Expand Up @@ -751,13 +768,13 @@ def get_node_to_backup_index_from_azs(
nodes: Sequence[CoordinatorNode],
azs_in_backup: TCounter[str],
azs_in_nodes: TCounter[str],
) -> Sequence[int | None]:
) -> NodeBackupIndices:
node_to_backup_index: list[int | None] = [None] * len(nodes)
# This is strictly speaking just best-effort assignment
for (backup_az, backup_n), (node_az, node_n) in zip(azs_in_backup.most_common(), azs_in_nodes.most_common()):
if backup_n > node_n:
missing_n = backup_n - node_n
raise exceptions.InsufficientNodesException(
raise exceptions.ImbalancedAZsException(
f"AZ {node_az}, to be restored from {backup_az}, is missing {missing_n} nodes"
)

Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/plugins/cassandra/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def get_restore_steps(self, *, context: OperationContext, req: ipc.RestoreReques
base.BackupNameStep(json_storage=context.json_storage, requested_name=req.name),
base.BackupManifestStep(json_storage=context.json_storage),
restore_steps.ParsePluginManifestStep(),
base.MapNodesStep(partial_restore_nodes=req.partial_restore_nodes),
base.MapNodesStep(partial_restore_nodes=req.partial_restore_nodes, ignore_imbalanced_azs=True),
CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.stop_cassandra),
CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.unrestore_sstables),
*cluster_restore_steps,
Expand Down
3 changes: 2 additions & 1 deletion astacus/coordinator/plugins/m3db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
CoordinatorPlugin,
ListHexdigestsStep,
MapNodesStep,
NodeBackupIndices,
OperationContext,
RestoreStep,
SnapshotStep,
Expand Down Expand Up @@ -191,7 +192,7 @@ def validate_m3_config(placement_nodes: Sequence[m3placement.M3PlacementNode], n
def rewrite_m3db_placement(
*,
key: ETCDKey,
node_to_backup_index: Sequence[int | None],
node_to_backup_index: NodeBackupIndices,
src_placement_nodes: Sequence[m3placement.M3PlacementNode],
dst_placement_nodes: Sequence[m3placement.M3PlacementNode],
nodes: Sequence[CoordinatorNode],
Expand Down
119 changes: 102 additions & 17 deletions tests/unit/coordinator/test_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from astacus.common.ipc import Plugin
from astacus.common.rohmustorage import MultiRohmuStorage
from astacus.coordinator.config import CoordinatorNode
from astacus.coordinator.plugins.base import get_node_to_backup_index
from astacus.coordinator.plugins.base import get_node_to_backup_index, NodeBackupIndices
from collections.abc import Callable
from contextlib import AbstractContextManager, nullcontext as does_not_raise
from dataclasses import dataclass
Expand Down Expand Up @@ -162,28 +162,113 @@ def match_clear(request: httpx.Request) -> httpx.Response | None:
assert app.state.coordinator_state.op_info.op_id == 1


@dataclass(frozen=True)
class NodeToBackupCase:
node_azlist: list[str]
backup_azlist: list[str]
ignore_imbalanced_azs: bool
expected_index: NodeBackupIndices | None
exception: AbstractContextManager
name: str


@pytest.mark.parametrize(
"node_azlist,backup_azlist,expected_index,exception",
"case",
[
# successful cases
([], [], [], does_not_raise()),
(["foo", "foo", "bar"], ["1", "2", "2"], [1, 2, 0], does_not_raise()),
(["a", "bb", "bb", "ccc", "ccc", "ccc"], ["1", "2", "2"], [None, 0, None, 1, 2, None], does_not_raise()),
(["a", "bb", "bb", "ccc", "ccc", "ccc"], ["3", "3", "3", "1", "2", "2"], [3, 4, 5, 0, 1, 2], does_not_raise()),
NodeToBackupCase(
node_azlist=[],
backup_azlist=[],
ignore_imbalanced_azs=False,
expected_index=[],
exception=does_not_raise(),
name="no-nodes",
),
NodeToBackupCase(
node_azlist=["foo", "foo", "bar"],
backup_azlist=["1", "2", "2"],
ignore_imbalanced_azs=False,
expected_index=[1, 2, 0],
exception=does_not_raise(),
name="matching-distribution",
),
NodeToBackupCase(
node_azlist=["a", "bb", "bb", "ccc", "ccc", "ccc"],
backup_azlist=["1", "2", "2"],
ignore_imbalanced_azs=False,
expected_index=[None, 0, None, 1, 2, None],
exception=does_not_raise(),
name="overcomplete-node-set",
),
NodeToBackupCase(
node_azlist=["a", "bb", "bb", "ccc", "ccc", "ccc"],
backup_azlist=["3", "3", "3", "1", "2", "2"],
ignore_imbalanced_azs=False,
expected_index=[
3,
4,
5,
0,
1,
2,
],
exception=does_not_raise(),
name="matching-distribution-3azs",
),
# errors
(["foo", "foo"], ["1", "2", "2"], None, pytest.raises(exceptions.InsufficientNodesException)),
(["foo", "foo", "foo"], ["1", "2", "2"], None, pytest.raises(exceptions.InsufficientAZsException)),
(["foo", "foo", "bar", "bar"], ["1", "3", "3", "3"], None, pytest.raises(exceptions.InsufficientNodesException)),
NodeToBackupCase(
node_azlist=["foo", "foo"],
backup_azlist=["1", "2", "2"],
ignore_imbalanced_azs=False,
expected_index=None,
exception=pytest.raises(exceptions.InsufficientNodesException),
name="insufficient-nodes",
),
NodeToBackupCase(
node_azlist=["foo", "foo", "foo"],
backup_azlist=["1", "2", "2"],
ignore_imbalanced_azs=False,
expected_index=None,
exception=pytest.raises(exceptions.InsufficientAZsException),
name="insufficient-azs",
),
NodeToBackupCase(
node_azlist=["foo", "foo", "bar", "bar"],
backup_azlist=["1", "3", "3", "3"],
ignore_imbalanced_azs=False,
expected_index=None,
exception=pytest.raises(exceptions.ImbalancedAZsException),
name="imbalanced-azs",
),
# ignored errors (insufficient nodes raise, imbalanced AZs doesn't)
NodeToBackupCase(
node_azlist=["foo", "foo"],
backup_azlist=["1", "2", "2"],
ignore_imbalanced_azs=True,
expected_index=None,
exception=pytest.raises(exceptions.InsufficientNodesException),
name="insufficient-nodes-not-ignored",
),
NodeToBackupCase(
node_azlist=["foo", "foo", "bar", "bar"],
backup_azlist=["1", "3", "3", "3"],
ignore_imbalanced_azs=True,
expected_index=[0, 1, 2, 3],
exception=does_not_raise(),
name="imbalanced-azs-ignored",
),
],
ids=lambda case: case.name,
)
def test_node_to_backup_index(
node_azlist: list[str], backup_azlist: list[str], expected_index: list[int], exception: AbstractContextManager
) -> None:
snapshot_results = [ipc.SnapshotResult(az=az) for az in backup_azlist]
nodes = [CoordinatorNode(url="unused", az=az) for az in node_azlist]
with exception:
assert expected_index == get_node_to_backup_index(
partial_restore_nodes=None, snapshot_results=snapshot_results, nodes=nodes
def test_node_to_backup_index(case: NodeToBackupCase) -> None:
snapshot_results = [ipc.SnapshotResult(az=az) for az in case.backup_azlist]
nodes = [CoordinatorNode(url="unused", az=az) for az in case.node_azlist]
with case.exception:
assert case.expected_index == get_node_to_backup_index(
partial_restore_nodes=None,
snapshot_results=snapshot_results,
nodes=nodes,
ignore_imbalanced_azs=case.ignore_imbalanced_azs,
)


Expand Down
Loading