Skip to content

Commit

Permalink
feat: Add api usage metrics for different periods (#3870)
Browse files Browse the repository at this point in the history
  • Loading branch information
zachaysan authored May 24, 2024
1 parent 1ae51a4 commit 50cc369
Show file tree
Hide file tree
Showing 19 changed files with 661 additions and 61 deletions.
101 changes: 85 additions & 16 deletions api/app_analytics/analytics_db_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from typing import List

from app_analytics.dataclasses import FeatureEvaluationData, UsageData
Expand All @@ -14,34 +14,103 @@
FeatureEvaluationBucket,
Resource,
)
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.db.models import Sum
from django.utils import timezone

from environments.models import Environment
from features.models import Feature
from organisations.models import Organisation

ANALYTICS_READ_BUCKET_SIZE = 15
from . import constants
from .types import PERIOD_TYPE


def get_usage_data(
organisation, environment_id=None, project_id=None
) -> List[UsageData]:
organisation: Organisation,
environment_id: int | None = None,
project_id: int | None = None,
period: PERIOD_TYPE | None = None,
) -> list[UsageData]:
now = timezone.now()

date_stop = date_start = None
period_starts_at = period_ends_at = None

match period:
case constants.CURRENT_BILLING_PERIOD:
if not getattr(organisation, "subscription_information_cache", None):
return []
sub_cache = organisation.subscription_information_cache
starts_at = sub_cache.current_billing_term_starts_at
month_delta = relativedelta(now, starts_at).months
period_starts_at = relativedelta(months=month_delta) + starts_at
period_ends_at = now
date_start = f"-{(now - period_starts_at).days}d"
date_stop = "now()"

case constants.PREVIOUS_BILLING_PERIOD:
if not getattr(organisation, "subscription_information_cache", None):
return []
sub_cache = organisation.subscription_information_cache
starts_at = sub_cache.current_billing_term_starts_at
month_delta = relativedelta(now, starts_at).months - 1
month_delta += relativedelta(now, starts_at).years * 12
period_starts_at = relativedelta(months=month_delta) + starts_at
period_ends_at = relativedelta(months=month_delta + 1) + starts_at
date_start = f"-{(now - period_starts_at).days}d"
date_stop = f"-{(now - period_ends_at).days}d"

case constants.NINETY_DAY_PERIOD:
period_starts_at = now - relativedelta(days=90)
period_ends_at = now
date_start = "-90d"
date_stop = "now()"

if settings.USE_POSTGRES_FOR_ANALYTICS:
return get_usage_data_from_local_db(
organisation, environment_id=environment_id, project_id=project_id
)
return get_usage_data_from_influxdb(
organisation.id, environment_id=environment_id, project_id=project_id
)
kwargs = {
"organisation": organisation,
"environment_id": environment_id,
"project_id": project_id,
}

if period_starts_at:
assert period_ends_at
kwargs["date_start"] = period_starts_at
kwargs["date_stop"] = period_ends_at

return get_usage_data_from_local_db(**kwargs)

kwargs = {
"organisation_id": organisation.id,
"environment_id": environment_id,
"project_id": project_id,
}

if date_start:
assert date_stop
kwargs["date_start"] = date_start
kwargs["date_stop"] = date_stop

return get_usage_data_from_influxdb(**kwargs)


def get_usage_data_from_local_db(
organisation, environment_id=None, project_id=None, period: int = 30
organisation: Organisation,
environment_id: int | None = None,
project_id: int | None = None,
date_start: datetime | None = None,
date_stop: datetime | None = None,
) -> List[UsageData]:
if date_start is None:
date_start = timezone.now() - timedelta(days=30)
if date_stop is None:
date_stop = timezone.now()

qs = APIUsageBucket.objects.filter(
environment_id__in=_get_environment_ids_for_org(organisation),
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
)
if project_id:
qs = qs.filter(project_id=project_id)
Expand All @@ -50,8 +119,8 @@ def get_usage_data_from_local_db(

qs = (
qs.filter(
created_at__date__lte=timezone.now(),
created_at__date__gt=timezone.now() - timedelta(days=30),
created_at__date__lte=date_stop,
created_at__date__gt=date_start,
)
.order_by("created_at__date")
.values("created_at__date", "resource")
Expand Down Expand Up @@ -80,7 +149,7 @@ def get_total_events_count(organisation) -> int:
environment_id__in=_get_environment_ids_for_org(organisation),
created_at__date__lte=date.today(),
created_at__date__gt=date.today() - timedelta(days=30),
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
).aggregate(total_count=Sum("total_count"))["total_count"]
else:
count = get_events_for_organisation(organisation.id)
Expand All @@ -105,7 +174,7 @@ def get_feature_evaluation_data_from_local_db(
feature_evaluation_data = (
FeatureEvaluationBucket.objects.filter(
environment_id=environment_id,
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
feature_name=feature.name,
created_at__date__lte=timezone.now(),
created_at__date__gt=timezone.now() - timedelta(days=period),
Expand Down
6 changes: 6 additions & 0 deletions api/app_analytics/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ANALYTICS_READ_BUCKET_SIZE = 15

# get_usage_data() related period constants
CURRENT_BILLING_PERIOD = "current_billing_period"
PREVIOUS_BILLING_PERIOD = "previous_billing_period"
NINETY_DAY_PERIOD = "90_day_period"
71 changes: 48 additions & 23 deletions api/app_analytics/influxdb_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def write(self):

@staticmethod
def influx_query_manager(
date_range: str = "30d",
date_start: str = "-30d",
date_stop: str = "now()",
drop_columns: typing.Tuple[str, ...] = DEFAULT_DROP_COLUMNS,
filters: str = "|> filter(fn:(r) => r._measurement == 'api_call')",
Expand All @@ -88,7 +88,7 @@ def influx_query_manager(

query = (
f'from(bucket:"{bucket}")'
f" |> range(start: -{date_range}, stop: {date_stop})"
f" |> range(start: {date_start}, stop: {date_stop})"
f" {filters}"
f" |> drop(columns: {drop_columns_input})"
f"{extra}"
Expand All @@ -103,7 +103,9 @@ def influx_query_manager(
return []


def get_events_for_organisation(organisation_id: id, date_range: str = "30d") -> int:
def get_events_for_organisation(
organisation_id: id, date_start: str = "-30d", date_stop: str = "now()"
) -> int:
"""
Query influx db for usage for given organisation id
Expand All @@ -126,7 +128,8 @@ def get_events_for_organisation(organisation_id: id, date_range: str = "30d") ->
"environment_id",
),
extra="|> sum()",
date_range=date_range,
date_start=date_start,
date_stop=date_stop,
)

total = 0
Expand All @@ -137,7 +140,9 @@ def get_events_for_organisation(organisation_id: id, date_range: str = "30d") ->
return total


def get_event_list_for_organisation(organisation_id: int, date_range: str = "30d"):
def get_event_list_for_organisation(
organisation_id: int, date_start: str = "-30d", date_stop: str = "now()"
) -> tuple[dict[str, list[int]], list[str]]:
"""
Query influx db for usage for given organisation id
Expand All @@ -149,14 +154,20 @@ def get_event_list_for_organisation(organisation_id: int, date_range: str = "30d
filters=f'|> filter(fn:(r) => r._measurement == "api_call") \
|> filter(fn: (r) => r["organisation_id"] == "{organisation_id}")',
extra="|> aggregateWindow(every: 24h, fn: sum)",
date_range=date_range,
date_start=date_start,
date_stop=date_stop,
)
dataset = defaultdict(list)
labels = []
for result in results:
for record in result.records:
dataset[record["resource"]].append(record["_value"])
required_records = int(date_range[:-1]) + 1
if date_stop == "now()":
required_records = abs(int(date_start[:-1])) + 1
else:
required_records = (
abs(int(date_start[:-1])) - abs(int(date_stop[:-1])) + 1
)
if len(labels) != required_records:
labels.append(record.values["_time"].strftime("%Y-%m-%d"))
return dataset, labels
Expand All @@ -166,7 +177,9 @@ def get_multiple_event_list_for_organisation(
organisation_id: int,
project_id: int = None,
environment_id: int = None,
):
date_start: str = "-30d",
date_stop: str = "now()",
) -> list[UsageData]:
"""
Query influx db for usage for given organisation id
Expand All @@ -176,7 +189,6 @@ def get_multiple_event_list_for_organisation(
:return: a number of requests for flags, traits, identities, environment-document
"""

filters = [
'r._measurement == "api_call"',
f'r["organisation_id"] == "{organisation_id}"',
Expand All @@ -189,6 +201,8 @@ def get_multiple_event_list_for_organisation(
filters.append(f'r["environment_id"] == "{environment_id}"')

results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
date_stop=date_stop,
filters=build_filter_string(filters),
extra="|> aggregateWindow(every: 24h, fn: sum)",
)
Expand All @@ -201,24 +215,33 @@ def get_multiple_event_list_for_organisation(
for i, record in enumerate(result.records):
dataset[i][record.values["resource"].capitalize()] = record.values["_value"]
dataset[i]["name"] = record.values["_time"].strftime("%Y-%m-%d")

return dataset


def get_usage_data(
organisation_id: int, project_id: int = None, environment_id=None
) -> typing.List[UsageData]:
organisation_id: int,
project_id: int | None = None,
environment_id: int | None = None,
date_start: str = "-30d",
date_stop: str = "now()",
) -> list[UsageData]:
events_list = get_multiple_event_list_for_organisation(
organisation_id, project_id, environment_id
organisation_id=organisation_id,
project_id=project_id,
environment_id=environment_id,
date_start=date_start,
date_stop=date_stop,
)
return UsageDataSchema(many=True).load(events_list)


def get_multiple_event_list_for_feature(
environment_id: int,
feature_name: str,
period: str = "30d",
date_start: str = "-30d",
aggregate_every: str = "24h",
) -> typing.List[dict]:
) -> list[dict]:
"""
Get aggregated request data for the given feature in a given environment across
all time, aggregated into time windows of length defined by the period argument.
Expand All @@ -237,14 +260,14 @@ def get_multiple_event_list_for_feature(
:param environment_id: an id of the environment to get usage for
:param feature_name: the name of the feature to get usage for
:param period: the influx time period to filter on, e.g. 30d, 7d, etc.
:param date_start: the influx time period to filter on, e.g. -30d, -7d, etc.
:param aggregate_every: the influx time period to aggregate the data by, e.g. 24h
:return: a list of dicts with feature and request count in a specific environment
"""

results = InfluxDBWrapper.influx_query_manager(
date_range=period,
date_start=date_start,
filters=f'|> filter(fn:(r) => r._measurement == "feature_evaluation") \
|> filter(fn: (r) => r["_field"] == "request_count") \
|> filter(fn: (r) => r["environment_id"] == "{environment_id}") \
Expand All @@ -271,16 +294,18 @@ def get_feature_evaluation_data(
feature_name: str, environment_id: int, period: str = "30d"
) -> typing.List[FeatureEvaluationData]:
data = get_multiple_event_list_for_feature(
feature_name=feature_name, environment_id=environment_id, period=period
feature_name=feature_name,
environment_id=environment_id,
date_start=f"-{period}",
)
return FeatureEvaluationDataSchema(many=True).load(data)


def get_top_organisations(date_range: str, limit: str = ""):
def get_top_organisations(date_start: str, limit: str = ""):
"""
Query influx db top used organisations
:param date_range: data range for top organisations
:param date_start: Start of the date range for top organisations
:param limit: limit for query
Expand All @@ -289,9 +314,9 @@ def get_top_organisations(date_range: str, limit: str = ""):
if limit:
limit = f"|> limit(n:{limit})"

bucket = range_bucket_mappings[date_range]
bucket = range_bucket_mappings[date_start]
results = InfluxDBWrapper.influx_query_manager(
date_range=date_range,
date_start=date_start,
bucket=bucket,
filters='|> filter(fn:(r) => r._measurement == "api_call") \
|> filter(fn: (r) => r["_field"] == "request_count")',
Expand All @@ -318,7 +343,7 @@ def get_top_organisations(date_range: str, limit: str = ""):
return dataset


def get_current_api_usage(organisation_id: int, date_range: str) -> int:
def get_current_api_usage(organisation_id: int, date_start: str) -> int:
"""
Query influx db for api usage
Expand All @@ -330,7 +355,7 @@ def get_current_api_usage(organisation_id: int, date_range: str) -> int:

bucket = read_bucket
results = InfluxDBWrapper.influx_query_manager(
date_range=date_range,
date_start=date_start,
bucket=bucket,
filters=build_filter_string(
[
Expand Down
10 changes: 10 additions & 0 deletions api/app_analytics/serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import typing

from rest_framework import serializers

from .types import PERIOD_TYPE


class UsageDataSerializer(serializers.Serializer):
flags = serializers.IntegerField()
Expand All @@ -12,6 +16,12 @@ class UsageDataSerializer(serializers.Serializer):
class UsageDataQuerySerializer(serializers.Serializer):
project_id = serializers.IntegerField(required=False)
environment_id = serializers.IntegerField(required=False)
period = serializers.ChoiceField(
choices=typing.get_args(PERIOD_TYPE),
allow_null=True,
default=None,
required=False,
)


class UsageTotalCountSerializer(serializers.Serializer):
Expand Down
2 changes: 1 addition & 1 deletion api/app_analytics/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timedelta
from typing import List, Tuple

from app_analytics.analytics_db_service import ANALYTICS_READ_BUCKET_SIZE
from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE
from django.conf import settings
from django.db.models import Count, Q, Sum
from django.utils import timezone
Expand Down
Loading

0 comments on commit 50cc369

Please sign in to comment.