diff --git a/api/app_analytics/management/commands/migrate_analytics.py b/api/app_analytics/management/commands/migrate_analytics.py new file mode 100644 index 000000000000..cb01f1f41c9a --- /dev/null +++ b/api/app_analytics/management/commands/migrate_analytics.py @@ -0,0 +1,19 @@ +import argparse +from typing import Any + +from app_analytics.migrate_to_pg import migrate_feature_evaluations +from django.core.management import BaseCommand + + +class Command(BaseCommand): + def add_arguments(self, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--migrate-till", + type=int, + dest="migrate_till", + help="Migrate data till n days ago", + default=30, + ) + + def handle(self, *args: Any, migrate_till: int, **options: Any) -> None: + migrate_feature_evaluations(migrate_till) diff --git a/api/app_analytics/migrate_to_pg.py b/api/app_analytics/migrate_to_pg.py new file mode 100644 index 000000000000..c90ddbc17fdd --- /dev/null +++ b/api/app_analytics/migrate_to_pg.py @@ -0,0 +1,28 @@ +from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE +from app_analytics.influxdb_wrapper import influxdb_client, read_bucket +from app_analytics.models import FeatureEvaluationBucket + + +def migrate_feature_evaluations(migrate_till: int = 30) -> None: + query_api = influxdb_client.query_api() + + for i in range(migrate_till): + range_start = f"-{i+1}d" + range_stop = f"-{i}d" + query = f"from (bucket: {read_bucket}) |> range(start: {range_start}, stop: {range_stop})" + + result = query_api.query(query) + + feature_evaluations = [] + for table in result: + for record in table.records: + feature_evaluations.append( + FeatureEvaluationBucket( + feature_name=record.values["feature_id"], + bucket_size=ANALYTICS_READ_BUCKET_SIZE, + created_at=record.get_time(), + total_count=record.get_value(), + environment_id=record.values["environment_id"], + ) + ) + FeatureEvaluationBucket.objects.bulk_create(feature_evaluations) diff --git a/api/tests/unit/app_analytics/test_migrate_to_pg.py b/api/tests/unit/app_analytics/test_migrate_to_pg.py new file mode 100644 index 000000000000..d744f631c42a --- /dev/null +++ b/api/tests/unit/app_analytics/test_migrate_to_pg.py @@ -0,0 +1,67 @@ +import pytest +from app_analytics.migrate_to_pg import migrate_feature_evaluations +from app_analytics.models import FeatureEvaluationBucket +from django.conf import settings +from django.utils import timezone +from pytest_mock import MockerFixture + + +@pytest.mark.skipif( + "analytics" not in settings.DATABASES, + reason="Skip test if analytics database is not configured", +) +@pytest.mark.django_db(databases=["analytics", "default"]) +def test_migrate_feature_evaluations(mocker: MockerFixture) -> None: + # Given + feature_name = "test_feature_one" + environment_id = "1" + + # mock the read bucket name + read_bucket = "test_bucket" + mocker.patch("app_analytics.migrate_to_pg.read_bucket", read_bucket) + + # Next, mock the influx client and create some records + mock_influxdb_client = mocker.patch("app_analytics.migrate_to_pg.influxdb_client") + mock_query_api = mock_influxdb_client.query_api.return_value + mock_tables = [] + for i in range(3): + mock_record = mocker.MagicMock( + values={"feature_id": feature_name, "environment_id": environment_id}, + spec_set=["values", "get_time", "get_value"], + ) + mock_record.get_time.return_value = timezone.now() - timezone.timedelta(days=i) + mock_record.get_value.return_value = 100 + + mock_table = mocker.MagicMock(records=[mock_record], spec_set=["records"]) + mock_tables.append(mock_table) + + mock_query_api.query.side_effect = [[table] for table in mock_tables] + + # When + migrate_feature_evaluations(migrate_till=3) + + # Then - only 3 records should be created + assert FeatureEvaluationBucket.objects.count() == 3 + assert ( + FeatureEvaluationBucket.objects.filter( + feature_name=feature_name, + environment_id=environment_id, + bucket_size=15, + total_count=100, + ).count() + == 3 + ) + # And, the query should have been called 3 times + mock_query_api.assert_has_calls( + [ + mocker.call.query( + f"from (bucket: {read_bucket}) |> range(start: -1d, stop: -0d)" + ), + mocker.call.query( + f"from (bucket: {read_bucket}) |> range(start: -2d, stop: -1d)" + ), + mocker.call.query( + f"from (bucket: {read_bucket}) |> range(start: -3d, stop: -2d)" + ), + ] + )