Skip to content

Commit

Permalink
Skip identities on Edge V2 migration when capacity budget exceeded
Browse files Browse the repository at this point in the history
- Add `EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET` setting
- Add per-project read capacity budget
- Enable incomplete Edge V2 migrations when capacity budget exceeded
  • Loading branch information
khvn26 committed May 3, 2024
1 parent 740a702 commit a281a7b
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 27 deletions.
5 changes: 5 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,3 +1197,8 @@
INSTALLED_APPS += ("split_testing",)

ENABLE_API_USAGE_ALERTING = env.bool("ENABLE_API_USAGE_ALERTING", default=False)

EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET = env.int(
"EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET",
default=10_000,
)
6 changes: 4 additions & 2 deletions api/environments/dynamodb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from .types import DynamoProjectMetadata
from .wrappers import (
from environments.dynamodb.types import DynamoProjectMetadata
from environments.dynamodb.wrappers import (
DynamoEnvironmentAPIKeyWrapper,
DynamoEnvironmentV2Wrapper,
DynamoEnvironmentWrapper,
DynamoIdentityWrapper,
)
from environments.dynamodb.wrappers.exceptions import CapacityBudgetExceeded

__all__ = (
"CapacityBudgetExceeded",
"DynamoEnvironmentAPIKeyWrapper",
"DynamoEnvironmentV2Wrapper",
"DynamoEnvironmentWrapper",
Expand Down
67 changes: 51 additions & 16 deletions api/environments/dynamodb/services.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
import logging
from decimal import Decimal
from typing import Generator, Iterable

from flag_engine.identities.models import IdentityModel

from environments.dynamodb import (
CapacityBudgetExceeded,
DynamoEnvironmentV2Wrapper,
DynamoIdentityWrapper,
)
from environments.dynamodb.types import (
EdgeV2MigrationResult,
IdentityOverridesV2Changeset,
IdentityOverrideV2,
)
from environments.models import Environment
from projects.models import EdgeV2MigrationStatus
from util.mappers import map_engine_feature_state_to_identity_override

logger = logging.getLogger(__name__)


def migrate_environments_to_v2(project_id: int) -> IdentityOverridesV2Changeset | None:
def migrate_environments_to_v2(
project_id: int,
capacity_budget: Decimal,
) -> EdgeV2MigrationResult | None:
"""
Migrate project's environments to `environments_v2` table.
:param project_id: ID of project to migrate.
:param capacity_budget: Max read capacity to spend when reading identities. Can be 0.
:returns: `EdgeV2MigrationResult` object or `None`.
If provided `capacity_budget` exceeded, including a budget of 0,
`EdgeV2MigrationResult.status` is set to `INCOMPLETE` and no identity overrides get
written to `environments_v2` table.
"""
dynamo_wrapper_v2 = DynamoEnvironmentV2Wrapper()
identity_wrapper = DynamoIdentityWrapper()

Expand All @@ -26,39 +45,55 @@ def migrate_environments_to_v2(project_id: int) -> IdentityOverridesV2Changeset

logger.info("Migrating environments to v2 for project %d", project_id)

environments_to_migrate = Environment.objects.filter(project_id=project_id)
identity_overrides_to_migrate = _iter_paginated_overrides(
identity_wrapper=identity_wrapper,
environments=environments_to_migrate,
environments_to_migrate = Environment.objects.filter_for_document_builder(
project_id=project_id
)

changeset = IdentityOverridesV2Changeset(
to_put=list(identity_overrides_to_migrate), to_delete=[]
)
logger.info(
"Retrieved %d identity overrides to migrate for project %d",
len(changeset.to_put),
project_id,
)
identity_overrides_changeset = IdentityOverridesV2Changeset(to_put=[], to_delete=[])
result_status = EdgeV2MigrationStatus.COMPLETE

try:
to_put = list(
_iter_paginated_overrides(
identity_wrapper=identity_wrapper,
environments=environments_to_migrate,
capacity_budget=capacity_budget,
)
)
except CapacityBudgetExceeded as exc:
result_status = EdgeV2MigrationStatus.INCOMPLETE
logger.warning("Incomplete migration for project %d", project_id, exc_info=exc)
else:
identity_overrides_changeset.to_put = to_put
logger.info(
"Retrieved %d identity overrides to migrate for project %d",
len(to_put),
project_id,
)

dynamo_wrapper_v2.write_environments(environments_to_migrate)
dynamo_wrapper_v2.update_identity_overrides(changeset)
dynamo_wrapper_v2.update_identity_overrides(identity_overrides_changeset)

logger.info("Finished migrating environments to v2 for project %d", project_id)
return changeset
return EdgeV2MigrationResult(
identity_overrides_changeset=identity_overrides_changeset,
status=result_status,
)


def _iter_paginated_overrides(
*,
identity_wrapper: DynamoIdentityWrapper,
environments: Iterable[Environment],
capacity_budget: Decimal,
) -> Generator[IdentityOverrideV2, None, None]:
for environment in environments:
environment_api_key = environment.api_key
for item in identity_wrapper.iter_all_items_paginated(
environment_api_key=environment_api_key,
capacity_budget=capacity_budget,
):
identity = IdentityModel.parse_obj(item)
identity = IdentityModel.model_validate(item)
for feature_state in identity.identity_features:
yield map_engine_feature_state_to_identity_override(
feature_state=feature_state,
Expand Down
10 changes: 10 additions & 0 deletions api/environments/dynamodb/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
import typing
from dataclasses import asdict, dataclass
from datetime import datetime

Expand All @@ -7,6 +8,9 @@
from flag_engine.features.models import FeatureStateModel
from pydantic import BaseModel

if typing.TYPE_CHECKING:
from projects.models import EdgeV2MigrationStatus

project_metadata_table = None

if settings.PROJECT_METADATA_TABLE_NAME_DYNAMO:
Expand Down Expand Up @@ -90,3 +94,9 @@ class IdentityOverrideV2(BaseModel):
class IdentityOverridesV2Changeset:
to_delete: list[IdentityOverrideV2]
to_put: list[IdentityOverrideV2]


@dataclass
class EdgeV2MigrationResult:
identity_overrides_changeset: IdentityOverridesV2Changeset
status: "EdgeV2MigrationStatus"
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Generated by Django 3.2.25 on 2024-05-03 01:52

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
(
"projects",
"0023_rename_identity_overrides_v2_migration_status_project_edge_v2_migration_status",
),
]

operations = [
migrations.AddField(
model_name="project",
name="edge_v2_migration_read_capacity_budget",
field=models.IntegerField(
default=None,
help_text="[Edge V2 migration] Read capacity budget override. If project migration was finished with `INCOMPLETE` status, you can set it to a higher value than `EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET` setting before restarting the migration.",
null=True,
),
),
migrations.AlterField(
model_name="project",
name="edge_v2_migration_status",
field=models.CharField(
choices=[
("NOT_STARTED", "Not Started"),
("IN_PROGRESS", "In Progress"),
("COMPLETE", "Complete"),
("INCOMPLETE", "Incomplete (identity overrides skipped)"),
],
default="NOT_STARTED",
help_text="[Edge V2 migration] Project migration status. Set to `IN_PROGRESS` to trigger migration start.",
max_length=50,
),
),
]
11 changes: 11 additions & 0 deletions api/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class EdgeV2MigrationStatus(models.TextChoices):
NOT_STARTED = "NOT_STARTED", "Not Started"
IN_PROGRESS = "IN_PROGRESS", "In Progress"
COMPLETE = "COMPLETE", "Complete"
INCOMPLETE = "INCOMPLETE", "Incomplete (identity overrides skipped)"


class Project(LifecycleModelMixin, SoftDeleteExportableModel):
Expand Down Expand Up @@ -88,6 +89,16 @@ class Project(LifecycleModelMixin, SoftDeleteExportableModel):
# Note that the default is actually set dynamically by a lifecycle hook on create
# since we need to know whether edge is enabled or not.
default=EdgeV2MigrationStatus.NOT_STARTED,
help_text="[Edge V2 migration] Project migration status. Set to `IN_PROGRESS` to trigger migration start.",
)
edge_v2_migration_read_capacity_budget = models.IntegerField(
null=True,
default=None,
help_text=(
"[Edge V2 migration] Read capacity budget override. If project migration was finished with "
"`INCOMPLETE` status, you can set it to a higher value than `EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET` "
"setting before restarting the migration."
),
)
stale_flags_limit_days = models.IntegerField(
default=30,
Expand Down
16 changes: 13 additions & 3 deletions api/projects/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from decimal import Decimal

from django.conf import settings
from django.db import transaction

from task_processor.decorators import register_task_handler
Expand All @@ -13,12 +16,19 @@ def write_environments_to_dynamodb(project_id: int) -> None:
@register_task_handler()
def migrate_project_environments_to_v2(project_id: int) -> None:
from environments.dynamodb.services import migrate_environments_to_v2
from projects.models import EdgeV2MigrationStatus, Project
from projects.models import Project

with transaction.atomic():
project = Project.objects.select_for_update().get(id=project_id)
if migrate_environments_to_v2(project_id=project_id):
project.edge_v2_migration_status = EdgeV2MigrationStatus.COMPLETE

if (capacity_budget := project.edge_v2_migration_read_capacity_budget) is None:
capacity_budget = settings.EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET

if result := migrate_environments_to_v2(
project_id=project_id,
capacity_budget=Decimal(capacity_budget),
):
project.edge_v2_migration_status = result.status
project.save()


Expand Down
59 changes: 57 additions & 2 deletions api/tests/unit/environments/dynamodb/test_unit_services.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from decimal import Decimal

from mypy_boto3_dynamodb.service_resource import Table
from pytest_mock import MockerFixture

Expand All @@ -6,15 +8,18 @@
DynamoIdentityWrapper,
)
from environments.dynamodb.services import migrate_environments_to_v2
from environments.dynamodb.wrappers.exceptions import CapacityBudgetExceeded
from environments.identities.models import Identity
from environments.models import Environment
from features.models import FeatureState
from projects.models import EdgeV2MigrationStatus
from util.mappers import (
map_engine_feature_state_to_identity_override,
map_engine_identity_to_identity_document,
map_environment_to_environment_v2_document,
map_identity_override_to_identity_override_document,
map_identity_to_engine,
map_identity_to_identity_document,
)


Expand Down Expand Up @@ -48,7 +53,10 @@ def test_migrate_environments_to_v2__environment_with_overrides__writes_expected
)

# When
migrate_environments_to_v2(project_id=environment.project_id)
migrate_environments_to_v2(
project_id=environment.project_id,
capacity_budget=float("Inf"),
)

# Then
results = flagsmith_environments_v2_table.scan()["Items"]
Expand All @@ -73,8 +81,55 @@ def test_migrate_environments_to_v2__wrapper_disabled__does_not_write(
)

# When
migrate_environments_to_v2(project_id=mocker.Mock())
migrate_environments_to_v2(
project_id=mocker.Mock(),
capacity_budget=mocker.Mock(),
)

# Then
mocked_dynamodb_identity_wrapper.return_value.assert_not_called()
mocked_dynamodb_v2_wrapper.return_value.assert_not_called()


def test_migrate_environments_to_v2__capacity_budget_exceeded__returns_expected(
environment: Environment,
identity: Identity,
identity_featurestate: FeatureState,
mocker: MockerFixture,
) -> None:
# Given
expected_capacity_budget = Decimal(12)
mocked_dynamodb_identity_wrapper = mocker.MagicMock(spec=DynamoIdentityWrapper)

def iter_all_items_paginated_gen_mock(**_):
yield map_identity_to_identity_document(identity)
raise CapacityBudgetExceeded(expected_capacity_budget, Decimal(13))

mocked_dynamodb_identity_wrapper.iter_all_items_paginated.side_effect = (
iter_all_items_paginated_gen_mock
)
mocker.patch(
"environments.dynamodb.services.DynamoIdentityWrapper",
autospec=True,
return_value=mocked_dynamodb_identity_wrapper,
)
mocker.patch(
"environments.dynamodb.services.DynamoEnvironmentV2Wrapper",
autospec=True,
return_value=mocker.MagicMock(),
)

# When
result = migrate_environments_to_v2(
project_id=environment.project_id,
capacity_budget=expected_capacity_budget,
)

# Then
mocked_dynamodb_identity_wrapper.iter_all_items_paginated.assert_called_once_with(
environment_api_key=environment.api_key,
capacity_budget=expected_capacity_budget,
)

assert result.status == EdgeV2MigrationStatus.INCOMPLETE
assert result.identity_overrides_changeset.to_put == []
Loading

0 comments on commit a281a7b

Please sign in to comment.