From 689296f8443d953625f3effb9fc493ccc48e2121 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Tue, 30 Jul 2024 11:56:06 +0530 Subject: [PATCH] feat(app_analytics): Add cache for feature evaluation --- api/app/settings/common.py | 4 + api/app_analytics/cache.py | 52 +++++++++- api/app_analytics/views.py | 34 ++----- .../test_unit_app_analytics_cache.py | 96 ++++++++++++++++++- .../test_unit_app_analytics_views.py | 50 ++-------- 5 files changed, 165 insertions(+), 71 deletions(-) diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 948165862fac..9ccf7955e83b 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -330,6 +330,10 @@ USE_POSTGRES_FOR_ANALYTICS = env.bool("USE_POSTGRES_FOR_ANALYTICS", default=False) USE_CACHE_FOR_USAGE_DATA = env.bool("USE_CACHE_FOR_USAGE_DATA", default=False) +FEATURE_EVALUATION_CACHE_SECONDS = env.int( + "FEATURE_EVALUATION_CACHE_SECONDS", default=60 +) + ENABLE_API_USAGE_TRACKING = env.bool("ENABLE_API_USAGE_TRACKING", default=True) if ENABLE_API_USAGE_TRACKING: diff --git a/api/app_analytics/cache.py b/api/app_analytics/cache.py index aea7c84f7184..a5858524f23d 100644 --- a/api/app_analytics/cache.py +++ b/api/app_analytics/cache.py @@ -1,4 +1,6 @@ -from app_analytics.tasks import track_request +from app_analytics.tasks import track_feature_evaluation, track_request +from app_analytics.track import track_feature_evaluation_influxdb +from django.conf import settings from django.utils import timezone CACHE_FLUSH_INTERVAL = 60 # seconds @@ -31,3 +33,51 @@ def track_request(self, resource: int, host: str, environment_key: str): self._cache[key] += 1 if (timezone.now() - self._last_flushed_at).seconds > CACHE_FLUSH_INTERVAL: self._flush() + + +class FeatureEvaluationCache: + def __init__(self): + self._cache = {} + self._last_flushed_at = timezone.now() + + def _flush(self): + evaluation_data = {} + for (environment_id, feature_name), eval_count in self._cache.items(): + if environment_id in evaluation_data: + evaluation_data[environment_id][feature_name] = eval_count + else: + evaluation_data[environment_id] = {feature_name: eval_count} + + for environment_id, feature_evaluations in evaluation_data.items(): + if settings.USE_POSTGRES_FOR_ANALYTICS: + track_feature_evaluation.delay( + kwargs={ + "environment_id": environment_id, + "feature_evaluations": feature_evaluations, + } + ) + + elif settings.INFLUXDB_TOKEN: + track_feature_evaluation_influxdb.delay( + kwargs={ + "environment_id": environment_id, + "feature_evaluations": feature_evaluations, + } + ) + + self._cache = {} + self._last_flushed_at = timezone.now() + + def track_feature_evaluation( + self, environment_id: int, feature_name: str, evaluation_count: int + ): + key = (environment_id, feature_name) + if key not in self._cache: + self._cache[key] = evaluation_count + else: + self._cache[key] += evaluation_count + + if ( + timezone.now() - self._last_flushed_at + ).seconds > settings.FEATURE_EVALUATION_CACHE_SECONDS: + self._flush() diff --git a/api/app_analytics/views.py b/api/app_analytics/views.py index d89b76e3fb97..1f71efc33a2a 100644 --- a/api/app_analytics/views.py +++ b/api/app_analytics/views.py @@ -4,14 +4,9 @@ get_total_events_count, get_usage_data, ) -from app_analytics.tasks import ( - track_feature_evaluation, - track_feature_evaluation_v2, -) -from app_analytics.track import ( - track_feature_evaluation_influxdb, - track_feature_evaluation_influxdb_v2, -) +from app_analytics.cache import FeatureEvaluationCache +from app_analytics.tasks import track_feature_evaluation_v2 +from app_analytics.track import track_feature_evaluation_influxdb_v2 from django.conf import settings from drf_yasg.utils import swagger_auto_schema from rest_framework import status @@ -38,6 +33,7 @@ ) logger = logging.getLogger(__name__) +feature_evaluation_cache = FeatureEvaluationCache() class SDKAnalyticsFlagsV2(CreateAPIView): @@ -141,26 +137,10 @@ def post(self, request, *args, **kwargs): content_type="application/json", status=status.HTTP_200_OK, ) - - if settings.USE_POSTGRES_FOR_ANALYTICS: - track_feature_evaluation.delay( - args=( - request.environment.id, - request.data, - ) + for feature_name, eval_count in self.request.data.items(): + feature_evaluation_cache.track_feature_evaluation( + request.environment.id, feature_name, eval_count ) - elif settings.INFLUXDB_TOKEN: - # Due to load issues on the task processor, we - # explicitly run this task in a separate thread. - # TODO: batch influx data to prevent large amounts - # of tasks. - track_feature_evaluation_influxdb.run_in_thread( - args=( - request.environment.id, - request.data, - ) - ) - return Response(status=status.HTTP_200_OK) def _is_data_valid(self) -> bool: diff --git a/api/tests/unit/app_analytics/test_unit_app_analytics_cache.py b/api/tests/unit/app_analytics/test_unit_app_analytics_cache.py index de5f9114d589..e6e6cde9b042 100644 --- a/api/tests/unit/app_analytics/test_unit_app_analytics_cache.py +++ b/api/tests/unit/app_analytics/test_unit_app_analytics_cache.py @@ -1,7 +1,12 @@ -from app_analytics.cache import CACHE_FLUSH_INTERVAL, APIUsageCache +from app_analytics.cache import ( + CACHE_FLUSH_INTERVAL, + APIUsageCache, + FeatureEvaluationCache, +) from app_analytics.models import Resource from django.utils import timezone from freezegun import freeze_time +from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture @@ -71,3 +76,92 @@ def test_api_usage_cache(mocker: MockerFixture) -> None: # finally, make sure track_request task was not called assert not mocked_track_request_task.called + + +def test_feature_evaluation_cache( + mocker: MockerFixture, + settings: SettingsWrapper, +): + # Given + settings.FEATURE_EVALUATION_CACHE_SECONDS = 60 + settings.USE_POSTGRES_FOR_ANALYTICS = False + settings.INFLUXDB_TOKEN = "token" + + mocked_track_evaluation_task = mocker.patch( + "app_analytics.cache.track_feature_evaluation" + ) + mocked_track_feature_evaluation_influxdb_task = mocker.patch( + "app_analytics.cache.track_feature_evaluation_influxdb" + ) + environment_1_id = 1 + environment_2_id = 2 + feature_1_name = "feature_1_name" + feature_2_name = "feature_2_name" + + cache = FeatureEvaluationCache() + now = timezone.now() + + with freeze_time(now) as frozen_time: + # Track some feature evaluations + for _ in range(10): + cache.track_feature_evaluation(environment_1_id, feature_1_name, 1) + cache.track_feature_evaluation(environment_1_id, feature_2_name, 1) + cache.track_feature_evaluation(environment_2_id, feature_2_name, 1) + + # Make sure the internal tasks were not called + assert not mocked_track_evaluation_task.delay.called + assert not mocked_track_feature_evaluation_influxdb_task.delay.called + + # Now, let's move the time forward + frozen_time.tick(settings.FEATURE_EVALUATION_CACHE_SECONDS + 1) + + # track another evaluation(to trigger cache flush) + cache.track_feature_evaluation(environment_1_id, feature_1_name, 1) + + # Then + mocked_track_feature_evaluation_influxdb_task.delay.assert_has_calls( + [ + mocker.call( + kwargs={ + "environment_id": environment_1_id, + "feature_evaluations": { + feature_1_name: 11, + feature_2_name: 10, + }, + }, + ), + mocker.call( + kwargs={ + "environment_id": environment_2_id, + "feature_evaluations": {feature_2_name: 10}, + }, + ), + ] + ) + # task responsible for tracking evaluation using postgres was not called + assert not mocked_track_evaluation_task.delay.called + + # Next, let's enable postgres tracking + settings.USE_POSTGRES_FOR_ANALYTICS = True + + # rest the mock + mocked_track_feature_evaluation_influxdb_task.reset_mock() + + # Track another evaluation + cache.track_feature_evaluation(environment_1_id, feature_1_name, 1) + + # move time forward again + frozen_time.tick(settings.FEATURE_EVALUATION_CACHE_SECONDS + 1) + + # track another one(to trigger cache flush) + cache.track_feature_evaluation(environment_1_id, feature_1_name, 1) + + # Assert that the call was made with only the data tracked after the flush interval. + mocked_track_evaluation_task.delay.assert_called_once_with( + kwargs={ + "environment_id": environment_1_id, + "feature_evaluations": {feature_1_name: 2}, + } + ) + # and the task for influx was not called + assert not mocked_track_feature_evaluation_influxdb_task.delay.called diff --git a/api/tests/unit/app_analytics/test_unit_app_analytics_views.py b/api/tests/unit/app_analytics/test_unit_app_analytics_views.py index a9276fa868ea..3486604586cf 100644 --- a/api/tests/unit/app_analytics/test_unit_app_analytics_views.py +++ b/api/tests/unit/app_analytics/test_unit_app_analytics_views.py @@ -36,8 +36,8 @@ def test_sdk_analytics_does_not_allow_bad_data(mocker, settings, environment): view = SDKAnalyticsFlags(request=request) - mocked_track_feature_eval = mocker.patch( - "app_analytics.views.track_feature_evaluation_influxdb" + mocked_feature_eval_cache = mocker.patch( + "app_analytics.views.feature_evaluation_cache" ) # When @@ -45,34 +45,7 @@ def test_sdk_analytics_does_not_allow_bad_data(mocker, settings, environment): # Then assert response.status_code == status.HTTP_200_OK - mocked_track_feature_eval.assert_not_called() - - -def test_sdk_analytics_allows_valid_data(mocker, settings, environment, feature): - # Given - settings.INFLUXDB_TOKEN = "some-token" - - data = {feature.name: 12} - request = mocker.MagicMock( - data=data, - environment=environment, - query_params={}, - ) - - view = SDKAnalyticsFlags(request=request) - - mocked_track_feature_eval = mocker.patch( - "app_analytics.views.track_feature_evaluation_influxdb" - ) - - # When - response = view.post(request) - - # Then - assert response.status_code == status.HTTP_200_OK - mocked_track_feature_eval.run_in_thread.assert_called_once_with( - args=(environment.id, data) - ) + mocked_feature_eval_cache.track_feature_evaluation.assert_not_called() def test_get_usage_data(mocker, admin_client, organisation): @@ -432,24 +405,20 @@ def test_set_sdk_analytics_flags_without_identifier( assert feature_evaluation_raw.evaluation_count is feature_request_count -def test_set_sdk_analytics_flags_v1_to_influxdb( +def test_sdk_analytics_flags_v1( api_client: APIClient, environment: Environment, feature: Feature, - identity: Identity, - settings: SettingsWrapper, mocker: MockerFixture, ) -> None: # Given - settings.INFLUXDB_TOKEN = "some-token" - url = reverse("api-v1:analytics-flags") api_client.credentials(HTTP_X_ENVIRONMENT_KEY=environment.api_key) feature_request_count = 2 data = {feature.name: feature_request_count} - mocked_track_feature_eval = mocker.patch( - "app_analytics.views.track_feature_evaluation_influxdb" + mocked_feature_evaluation_cache = mocker.patch( + "app_analytics.views.feature_evaluation_cache" ) # When @@ -459,9 +428,6 @@ def test_set_sdk_analytics_flags_v1_to_influxdb( # Then assert response.status_code == status.HTTP_200_OK - mocked_track_feature_eval.run_in_thread.assert_called_once_with( - args=( - environment.id, - data, - ) + mocked_feature_evaluation_cache.track_feature_evaluation.assert_called_once_with( + environment.id, feature.name, feature_request_count )