diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 2763d7147cb12..60bcfde86354b 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -1197,3 +1197,8 @@ INSTALLED_APPS += ("split_testing",) ENABLE_API_USAGE_ALERTING = env.bool("ENABLE_API_USAGE_ALERTING", default=False) + +EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET = env.int( + "EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET", + default=10_000, +) diff --git a/api/environments/dynamodb/__init__.py b/api/environments/dynamodb/__init__.py index 26348bce0552b..b0db2e852e2f8 100644 --- a/api/environments/dynamodb/__init__.py +++ b/api/environments/dynamodb/__init__.py @@ -1,12 +1,14 @@ -from .types import DynamoProjectMetadata -from .wrappers import ( +from environments.dynamodb.types import DynamoProjectMetadata +from environments.dynamodb.wrappers import ( DynamoEnvironmentAPIKeyWrapper, DynamoEnvironmentV2Wrapper, DynamoEnvironmentWrapper, DynamoIdentityWrapper, ) +from environments.dynamodb.wrappers.exceptions import CapacityBudgetExceeded __all__ = ( + "CapacityBudgetExceeded", "DynamoEnvironmentAPIKeyWrapper", "DynamoEnvironmentV2Wrapper", "DynamoEnvironmentWrapper", diff --git a/api/environments/dynamodb/services.py b/api/environments/dynamodb/services.py index 23c9d9ecd48e4..7f2f9f61825ed 100644 --- a/api/environments/dynamodb/services.py +++ b/api/environments/dynamodb/services.py @@ -1,23 +1,42 @@ import logging +from decimal import Decimal from typing import Generator, Iterable from flag_engine.identities.models import IdentityModel from environments.dynamodb import ( + CapacityBudgetExceeded, DynamoEnvironmentV2Wrapper, DynamoIdentityWrapper, ) from environments.dynamodb.types import ( + EdgeV2MigrationResult, IdentityOverridesV2Changeset, IdentityOverrideV2, ) from environments.models import Environment +from projects.models import EdgeV2MigrationStatus from util.mappers import map_engine_feature_state_to_identity_override logger = logging.getLogger(__name__) -def migrate_environments_to_v2(project_id: int) -> IdentityOverridesV2Changeset | None: +def migrate_environments_to_v2( + project_id: int, + capacity_budget: Decimal, +) -> EdgeV2MigrationResult | None: + """ + Migrate project's environments to `environments_v2` table. + + :param project_id: ID of project to migrate. + :param capacity_budget: Max read capacity to spend when reading identities. Can be 0. + + :returns: `EdgeV2MigrationResult` object or `None`. + + If provided `capacity_budget` exceeded, including a budget of 0, + `EdgeV2MigrationResult.status` is set to `INCOMPLETE` and no identity overrides get + written to `environments_v2` table. + """ dynamo_wrapper_v2 = DynamoEnvironmentV2Wrapper() identity_wrapper = DynamoIdentityWrapper() @@ -26,39 +45,55 @@ def migrate_environments_to_v2(project_id: int) -> IdentityOverridesV2Changeset logger.info("Migrating environments to v2 for project %d", project_id) - environments_to_migrate = Environment.objects.filter(project_id=project_id) - identity_overrides_to_migrate = _iter_paginated_overrides( - identity_wrapper=identity_wrapper, - environments=environments_to_migrate, + environments_to_migrate = Environment.objects.filter_for_document_builder( + project_id=project_id ) - changeset = IdentityOverridesV2Changeset( - to_put=list(identity_overrides_to_migrate), to_delete=[] - ) - logger.info( - "Retrieved %d identity overrides to migrate for project %d", - len(changeset.to_put), - project_id, - ) + identity_overrides_changeset = IdentityOverridesV2Changeset(to_put=[], to_delete=[]) + result_status = EdgeV2MigrationStatus.COMPLETE + + try: + to_put = list( + _iter_paginated_overrides( + identity_wrapper=identity_wrapper, + environments=environments_to_migrate, + capacity_budget=capacity_budget, + ) + ) + except CapacityBudgetExceeded as exc: + result_status = EdgeV2MigrationStatus.INCOMPLETE + logger.warning("Incomplete migration for project %d", project_id, exc_info=exc) + else: + identity_overrides_changeset.to_put = to_put + logger.info( + "Retrieved %d identity overrides to migrate for project %d", + len(to_put), + project_id, + ) dynamo_wrapper_v2.write_environments(environments_to_migrate) - dynamo_wrapper_v2.update_identity_overrides(changeset) + dynamo_wrapper_v2.update_identity_overrides(identity_overrides_changeset) logger.info("Finished migrating environments to v2 for project %d", project_id) - return changeset + return EdgeV2MigrationResult( + identity_overrides_changeset=identity_overrides_changeset, + status=result_status, + ) def _iter_paginated_overrides( *, identity_wrapper: DynamoIdentityWrapper, environments: Iterable[Environment], + capacity_budget: Decimal, ) -> Generator[IdentityOverrideV2, None, None]: for environment in environments: environment_api_key = environment.api_key for item in identity_wrapper.iter_all_items_paginated( environment_api_key=environment_api_key, + capacity_budget=capacity_budget, ): - identity = IdentityModel.parse_obj(item) + identity = IdentityModel.model_validate(item) for feature_state in identity.identity_features: yield map_engine_feature_state_to_identity_override( feature_state=feature_state, diff --git a/api/environments/dynamodb/types.py b/api/environments/dynamodb/types.py index 552321777feca..28edc8c31556a 100644 --- a/api/environments/dynamodb/types.py +++ b/api/environments/dynamodb/types.py @@ -1,4 +1,5 @@ import enum +import typing from dataclasses import asdict, dataclass from datetime import datetime @@ -7,6 +8,9 @@ from flag_engine.features.models import FeatureStateModel from pydantic import BaseModel +if typing.TYPE_CHECKING: + from projects.models import EdgeV2MigrationStatus + project_metadata_table = None if settings.PROJECT_METADATA_TABLE_NAME_DYNAMO: @@ -90,3 +94,9 @@ class IdentityOverrideV2(BaseModel): class IdentityOverridesV2Changeset: to_delete: list[IdentityOverrideV2] to_put: list[IdentityOverrideV2] + + +@dataclass +class EdgeV2MigrationResult: + identity_overrides_changeset: IdentityOverridesV2Changeset + status: "EdgeV2MigrationStatus" diff --git a/api/projects/migrations/0024_add_project_edge_v2_migration_read_capacity_budget.py b/api/projects/migrations/0024_add_project_edge_v2_migration_read_capacity_budget.py new file mode 100644 index 0000000000000..4e84d77abb93a --- /dev/null +++ b/api/projects/migrations/0024_add_project_edge_v2_migration_read_capacity_budget.py @@ -0,0 +1,40 @@ +# Generated by Django 3.2.25 on 2024-05-03 01:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ( + "projects", + "0023_rename_identity_overrides_v2_migration_status_project_edge_v2_migration_status", + ), + ] + + operations = [ + migrations.AddField( + model_name="project", + name="edge_v2_migration_read_capacity_budget", + field=models.IntegerField( + default=None, + help_text="[Edge V2 migration] Read capacity budget override. If project migration was finished with `INCOMPLETE` status, you can set it to a higher value than `EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET` setting before restarting the migration.", + null=True, + ), + ), + migrations.AlterField( + model_name="project", + name="edge_v2_migration_status", + field=models.CharField( + choices=[ + ("NOT_STARTED", "Not Started"), + ("IN_PROGRESS", "In Progress"), + ("COMPLETE", "Complete"), + ("INCOMPLETE", "Incomplete (identity overrides skipped)"), + ], + default="NOT_STARTED", + help_text="[Edge V2 migration] Project migration status. Set to `IN_PROGRESS` to trigger migration start.", + max_length=50, + ), + ), + ] diff --git a/api/projects/models.py b/api/projects/models.py index e28f2caf83943..544490b5d253a 100644 --- a/api/projects/models.py +++ b/api/projects/models.py @@ -39,6 +39,7 @@ class EdgeV2MigrationStatus(models.TextChoices): NOT_STARTED = "NOT_STARTED", "Not Started" IN_PROGRESS = "IN_PROGRESS", "In Progress" COMPLETE = "COMPLETE", "Complete" + INCOMPLETE = "INCOMPLETE", "Incomplete (identity overrides skipped)" class Project(LifecycleModelMixin, SoftDeleteExportableModel): @@ -88,6 +89,16 @@ class Project(LifecycleModelMixin, SoftDeleteExportableModel): # Note that the default is actually set dynamically by a lifecycle hook on create # since we need to know whether edge is enabled or not. default=EdgeV2MigrationStatus.NOT_STARTED, + help_text="[Edge V2 migration] Project migration status. Set to `IN_PROGRESS` to trigger migration start.", + ) + edge_v2_migration_read_capacity_budget = models.IntegerField( + null=True, + default=None, + help_text=( + "[Edge V2 migration] Read capacity budget override. If project migration was finished with " + "`INCOMPLETE` status, you can set it to a higher value than `EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET` " + "setting before restarting the migration." + ), ) stale_flags_limit_days = models.IntegerField( default=30, diff --git a/api/projects/tasks.py b/api/projects/tasks.py index ee53c9dcd91fc..1c99990e660a0 100644 --- a/api/projects/tasks.py +++ b/api/projects/tasks.py @@ -1,3 +1,6 @@ +from decimal import Decimal + +from django.conf import settings from django.db import transaction from task_processor.decorators import register_task_handler @@ -13,12 +16,19 @@ def write_environments_to_dynamodb(project_id: int) -> None: @register_task_handler() def migrate_project_environments_to_v2(project_id: int) -> None: from environments.dynamodb.services import migrate_environments_to_v2 - from projects.models import EdgeV2MigrationStatus, Project + from projects.models import Project with transaction.atomic(): project = Project.objects.select_for_update().get(id=project_id) - if migrate_environments_to_v2(project_id=project_id): - project.edge_v2_migration_status = EdgeV2MigrationStatus.COMPLETE + + if (capacity_budget := project.edge_v2_migration_read_capacity_budget) is None: + capacity_budget = settings.EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET + + if result := migrate_environments_to_v2( + project_id=project_id, + capacity_budget=Decimal(capacity_budget), + ): + project.edge_v2_migration_status = result.status project.save() diff --git a/api/tests/unit/environments/dynamodb/test_unit_services.py b/api/tests/unit/environments/dynamodb/test_unit_services.py index 6314ec4a77b89..2217c18a9c88a 100644 --- a/api/tests/unit/environments/dynamodb/test_unit_services.py +++ b/api/tests/unit/environments/dynamodb/test_unit_services.py @@ -1,3 +1,5 @@ +from decimal import Decimal + from mypy_boto3_dynamodb.service_resource import Table from pytest_mock import MockerFixture @@ -6,15 +8,18 @@ DynamoIdentityWrapper, ) from environments.dynamodb.services import migrate_environments_to_v2 +from environments.dynamodb.wrappers.exceptions import CapacityBudgetExceeded from environments.identities.models import Identity from environments.models import Environment from features.models import FeatureState +from projects.models import EdgeV2MigrationStatus from util.mappers import ( map_engine_feature_state_to_identity_override, map_engine_identity_to_identity_document, map_environment_to_environment_v2_document, map_identity_override_to_identity_override_document, map_identity_to_engine, + map_identity_to_identity_document, ) @@ -48,7 +53,10 @@ def test_migrate_environments_to_v2__environment_with_overrides__writes_expected ) # When - migrate_environments_to_v2(project_id=environment.project_id) + migrate_environments_to_v2( + project_id=environment.project_id, + capacity_budget=float("Inf"), + ) # Then results = flagsmith_environments_v2_table.scan()["Items"] @@ -73,8 +81,55 @@ def test_migrate_environments_to_v2__wrapper_disabled__does_not_write( ) # When - migrate_environments_to_v2(project_id=mocker.Mock()) + migrate_environments_to_v2( + project_id=mocker.Mock(), + capacity_budget=mocker.Mock(), + ) # Then mocked_dynamodb_identity_wrapper.return_value.assert_not_called() mocked_dynamodb_v2_wrapper.return_value.assert_not_called() + + +def test_migrate_environments_to_v2__capacity_budget_exceeded__returns_expected( + environment: Environment, + identity: Identity, + identity_featurestate: FeatureState, + mocker: MockerFixture, +) -> None: + # Given + expected_capacity_budget = Decimal(12) + mocked_dynamodb_identity_wrapper = mocker.MagicMock(spec=DynamoIdentityWrapper) + + def iter_all_items_paginated_gen_mock(**_): + yield map_identity_to_identity_document(identity) + raise CapacityBudgetExceeded(expected_capacity_budget, Decimal(13)) + + mocked_dynamodb_identity_wrapper.iter_all_items_paginated.side_effect = ( + iter_all_items_paginated_gen_mock + ) + mocker.patch( + "environments.dynamodb.services.DynamoIdentityWrapper", + autospec=True, + return_value=mocked_dynamodb_identity_wrapper, + ) + mocker.patch( + "environments.dynamodb.services.DynamoEnvironmentV2Wrapper", + autospec=True, + return_value=mocker.MagicMock(), + ) + + # When + result = migrate_environments_to_v2( + project_id=environment.project_id, + capacity_budget=expected_capacity_budget, + ) + + # Then + mocked_dynamodb_identity_wrapper.iter_all_items_paginated.assert_called_once_with( + environment_api_key=environment.api_key, + capacity_budget=expected_capacity_budget, + ) + + assert result.status == EdgeV2MigrationStatus.INCOMPLETE + assert result.identity_overrides_changeset.to_put == [] diff --git a/api/tests/unit/projects/test_unit_projects_tasks.py b/api/tests/unit/projects/test_unit_projects_tasks.py index 6daed131254bd..a983507bced36 100644 --- a/api/tests/unit/projects/test_unit_projects_tasks.py +++ b/api/tests/unit/projects/test_unit_projects_tasks.py @@ -1,8 +1,13 @@ +from decimal import Decimal + import pytest from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture -from environments.dynamodb.types import IdentityOverridesV2Changeset +from environments.dynamodb.types import ( + EdgeV2MigrationResult, + IdentityOverridesV2Changeset, +) from environments.models import Environment from features.models import Feature from projects.models import EdgeV2MigrationStatus, Project @@ -19,6 +24,7 @@ def project_v2_migration_in_progress( project: Project, ) -> Project: project.edge_v2_migration_status = EdgeV2MigrationStatus.IN_PROGRESS + project.edge_v2_migration_read_capacity_budget = 12 project.save() return project @@ -27,7 +33,12 @@ def project_v2_migration_in_progress( "migrate_environments_to_v2_return_value, expected_status", ( ( - IdentityOverridesV2Changeset(to_put=[], to_delete=[]), + EdgeV2MigrationResult( + identity_overrides_changeset=IdentityOverridesV2Changeset( + to_put=[], to_delete=[] + ), + status=EdgeV2MigrationStatus.COMPLETE, + ), EdgeV2MigrationStatus.COMPLETE, ), ( @@ -39,7 +50,7 @@ def project_v2_migration_in_progress( def test_migrate_project_environments_to_v2__calls_expected( mocker: MockerFixture, project_v2_migration_in_progress: Project, - migrate_environments_to_v2_return_value: IdentityOverridesV2Changeset | None, + migrate_environments_to_v2_return_value: EdgeV2MigrationResult | None, expected_status: str, ): # Given @@ -59,6 +70,7 @@ def test_migrate_project_environments_to_v2__calls_expected( project_v2_migration_in_progress.refresh_from_db() mocked_migrate_environments_to_v2.assert_called_once_with( project_id=project_v2_migration_in_progress.id, + capacity_budget=project_v2_migration_in_progress.edge_v2_migration_read_capacity_budget, ) assert project_v2_migration_in_progress.edge_v2_migration_status == expected_status @@ -86,13 +98,43 @@ def test_migrate_project_environments_to_v2__expected_status_on_error( # Then mocked_migrate_environments_to_v2.assert_called_once_with( - project_id=project_v2_migration_in_progress.id + project_id=project_v2_migration_in_progress.id, + capacity_budget=Decimal( + project_v2_migration_in_progress.edge_v2_migration_read_capacity_budget + ), ) assert project_v2_migration_in_progress.edge_v2_migration_status == ( EdgeV2MigrationStatus.IN_PROGRESS ) +def test_migrate_project_environments_to_v2__project_capacity_budget_none__call_expected( + mocker: MockerFixture, + project_v2_migration_in_progress: Project, + settings: SettingsWrapper, +) -> None: + # Given + expected_capacity_budget = 33 + project_v2_migration_in_progress.edge_v2_migration_read_capacity_budget = None + project_v2_migration_in_progress.save() + settings.EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET = expected_capacity_budget + + mocked_migrate_environments_to_v2 = mocker.patch( + "environments.dynamodb.services.migrate_environments_to_v2", + autospec=True, + return_value=None, + ) + + # When + migrate_project_environments_to_v2(project_id=project_v2_migration_in_progress.id) + + # Then + mocked_migrate_environments_to_v2.assert_called_once_with( + project_id=project_v2_migration_in_progress.id, + capacity_budget=Decimal(expected_capacity_budget), + ) + + def test_handle_cascade_delete( project: Project, environment: Environment,