Skip to content

Commit

Permalink
Merge pull request #186 from dbt-labs/making-rolling-secondary-calc-u…
Browse files Browse the repository at this point in the history
…nbound

Let the bound be unbound
  • Loading branch information
callum-mcdata authored Nov 18, 2022
2 parents 618c238 + 58008fd commit e962a58
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 15 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20221117-162500.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Making rolling unbound
time: 2022-11-17T16:25:00.850213-06:00
custom:
Author: callum-mcdata
Issue: "140"
PR: "186"
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,14 @@ Constructor: `metrics.period_to_date(aggregate, period [, alias, metric_list])`

## Rolling ([source](/macros/secondary_calculations/secondary_calculation_rolling.sql))

The rolling secondary calculation performs an aggregation on a defined number of rows in metric dataset. For example, if the user selects the `week` grain and sets a rolling secondary calculation to `4` then the value returned will be a rolling 4 week calculation of whatever aggregation type was selected.
The rolling secondary calculation performs an aggregation on a number of rows in metric dataset. For example, if the user selects the `week` grain and sets a rolling secondary calculation to `4` then the value returned will be a rolling 4 week calculation of whatever aggregation type was selected. If the `interval` input is not provided then the rolling caclulation will be unbounded on all preceding rows.

Constructor: `metrics.rolling(aggregate, interval [, alias, metric_list])`
Constructor: `metrics.rolling(aggregate [, interval, alias, metric_list])`

| Input | Example | Description | Required |
| -------------------------- | ----------- | ----------- | -----------|
| `aggregate` | `max`, `average` | The aggregation to use in the window function. Options vary based on the primary aggregation and are enforced in [validate_aggregate_coherence()](/macros/secondary_calculations/validate_aggregate_coherence.sql). | Yes |
| `interval` | 1 | Integer - the number of time grains to look back | Yes |
| `interval` | 1 | Integer - the number of time grains to look back | No |
| `alias` | `month_to_date` | The column alias for the resulting calculation | No |
| `metric_list` | `base_sum_metric` | List of metrics that the secondary calculation should be applied to. Default is all metrics selected | No |

Expand Down Expand Up @@ -328,10 +328,10 @@ The metrics package contains validation on the configurations you're able to pro

Below is the list of metric configs currently accepted by this package.

| Config | Type | Accepted Values | Default Value | Description |
|-----------------------------|---------|-----------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `enabled` | boolean | True/False | True | Enables or disables a metric node. When disabled, dbt will not consider it as part of your project. |
| `treat_null_values_as_zero` | boolean | True/False | True | Controls the `coalesce` behavior for metrics. By default, when there are no observations for a metric, the output of the metric as well as period Over period secondary calculations will include a `coalesce({{ field }}, 0)` to return 0's rather than nulls. Setting this config to False instead returns `NULL` values. |
| Config | Type | Accepted Values | Default Value | Description |
|--------|------|-----------------|---------------|-------------|
| `enabled` | boolean | True/False | True | Enables or disables a metric node. When disabled, dbt will not consider it as part of your project. |
| `treat_null_values_as_zero` | boolean | True/False | True | Controls the `coalesce` behavior for metrics. By default, when there are no observations for a metric, the output of the metric as well as period Over period secondary calculations will include a `coalesce({{ field }}, 0)` to return 0's rather than nulls. Setting this config to False instead returns `NULL` values. |

## All_Time Grain

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ from
{{ metrics.calculate(
metric('base_count_metric'),
grain='month',
start_date = '2021-01-01',
end_date = '2021-04-01',
start_date = '2022-01-01',
end_date = '2022-04-01',
secondary_calculations=[
{"calculation": "period_over_period", "interval": 1, "comparison_strategy": "difference", "alias": "pop_1mth"},
{"calculation": "period_over_period", "interval": 1, "comparison_strategy": "ratio"},
{"calculation": "period_to_date", "aggregate": "sum", "period": "year", "alias": "ytd_sum"},
{"calculation": "period_to_date", "aggregate": "max", "period": "month"},
{"calculation": "rolling", "interval": 3, "aggregate": "average", "alias": "avg_3mth"},
{"calculation": "rolling", "interval": 3, "aggregate": "sum"},
{"calculation": "rolling", "aggregate": "sum"},
]
)
}}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@

{%- elif calc_type == 'rolling' %}
{%- if is_multiple_metrics -%}
{%- do return(metric_name ~ "_" ~ "rolling_" ~ calc_config.aggregate ~ "_" ~ calc_config.interval ~ "_" ~ grain) %}
{%- if calc_config.interval -%}
{%- do return(metric_name ~ "_" ~ "rolling_" ~ calc_config.aggregate ~ "_" ~ calc_config.interval ~ "_" ~ grain) %}
{%- else -%}
{%- do return(metric_name ~ "_" ~ "rolling_" ~ calc_config.aggregate) %}
{%- endif -%}
{%- else -%}
{%- do return("rolling_" ~ calc_config.aggregate ~ "_" ~ calc_config.interval ~ "_" ~ grain) %}
{%- if calc_config.interval -%}
{%- do return("rolling_" ~ calc_config.aggregate ~ "_" ~ calc_config.interval ~ "_" ~ grain) %}
{%- else -%}
{%- do return("rolling_" ~ calc_config.aggregate) %}
{%- endif -%}
{%- endif -%}

{%- elif calc_type == 'period_to_date' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
partition by {{ dimensions | join(", ") }}
{% endif -%}
order by date_{{grain}}
{% if calc_config.interval %}
rows between {{ calc_config.interval - 1 }} preceding and current row
{% else %}
rows between unbounded preceding and current row
{% endif %}
)
{% endset %}

Expand Down
3 changes: 0 additions & 3 deletions macros/secondary_calculations_configuration/rolling.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
{% if not aggregate %}
{% set _ = missing_args.append("aggregate") %}
{% endif %}
{% if not interval %}
{% set _ = missing_args.append("interval") %}
{% endif %}
{% if missing_args | length > 0 %}
{% do exceptions.raise_compiler_error( missing_args | join(", ") ~ ' not provided to rolling') %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from struct import pack
import os
import pytest
from dbt.tests.util import run_dbt

# our file contents
from tests.functional.fixtures import (
fact_orders_source_csv,
fact_orders_sql,
fact_orders_yml,
custom_calendar_sql
)

# models/rolling_count.sql
rolling_count_sql = """
select *
from
{{ metrics.calculate(metric('rolling_count'),
grain='week',
secondary_calculations=[
metrics.rolling(aggregate="min"),
metrics.rolling(aggregate="max"),
metrics.rolling(aggregate="sum"),
metrics.rolling(aggregate="average")
]
)
}}
"""

# models/rolling_count.yml
rolling_count_yml = """
version: 2
models:
- name: rolling_count
tests:
- metrics.metric_equality:
compare_model: ref('rolling_count__expected')
metrics:
- name: rolling_count
model: ref('fact_orders')
label: Count Distinct
timestamp: order_date
time_grains: [day, week, month]
calculation_method: count
expression: customer_id
dimensions:
- had_discount
- order_country
"""

# seeds/rolling_count__expected.csv
if os.getenv('dbt_target') == 'snowflake':
rolling_count__expected_csv = """
date_week,rolling_count,rolling_count_rolling_min,rolling_count_rolling_max,rolling_count_rolling_sum,rolling_count_rolling_average
2022-01-03,2,2,2,2,2.000
2022-01-10,1,1,2,3,1.500
2022-01-17,3,1,3,6,2.000
2022-01-24,1,1,3,7,1.750
2022-01-31,1,1,3,8,1.600
2022-02-07,1,1,3,9,1.500
2022-02-14,1,1,3,10,1.428
""".lstrip()
else:
rolling_count__expected_csv = """
date_week,rolling_count,rolling_count_rolling_min,rolling_count_rolling_max,rolling_count_rolling_sum,rolling_count_rolling_average
2022-01-03,2,2,2,2,2.0000000000000000
2022-01-10,1,1,2,3,1.5000000000000000
2022-01-17,3,1,3,6,2.0000000000000000
2022-01-24,1,1,3,7,1.7500000000000000
2022-01-31,1,1,3,8,1.6000000000000000
2022-02-07,1,1,3,9,1.5000000000000000
2022-02-14,1,1,3,10,1.4285714285714286
""".lstrip()

# seeds/rolling_count__expected.yml
if os.getenv('dbt_target') == 'bigquery':
rolling_count__expected_yml = """
version: 2
seeds:
- name: rolling_count__expected
config:
column_types:
date_week: date
rolling_count: INT64
rolling_count_rolling_min: INT64
rolling_count_rolling_max: INT64
rolling_count_rolling_sum: INT64
rolling_count_rolling_average: FLOAT64
""".lstrip()
else:
rolling_count__expected_yml = """"""

class TestRollingCount:

# configuration in dbt_project.yml
# setting bigquery as table to get around query complexity
# resource constraints with compunding views
if os.getenv('dbt_target') == 'bigquery':
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "example",
"models": {"+materialized": "table"},
"vars":{
"dbt_metrics_calendar_model": "custom_calendar",
"custom_calendar_dimension_list": ["is_weekend"]
}
}
else:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "example",
"models": {"+materialized": "view"}
}

# install current repo as package
@pytest.fixture(scope="class")
def packages(self):
return {
"packages": [
{"local": os.getcwd()}
]
}

# everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"fact_orders_source.csv": fact_orders_source_csv,
"rolling_count__expected.csv": rolling_count__expected_csv,
"rolling_count__expected.yml": rolling_count__expected_yml
}

# everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"fact_orders.sql": fact_orders_sql,
"fact_orders.yml": fact_orders_yml,
"custom_calendar.sql": custom_calendar_sql,
"rolling_count.sql": rolling_count_sql,
"rolling_count.yml": rolling_count_yml
}

def test_build_completion(self,project,):
# running deps to install package
results = run_dbt(["deps"])

# seed seeds
results = run_dbt(["seed"])
assert len(results) == 2

# initial run
results = run_dbt(["run"])
assert len(results) == 4

# test tests
results = run_dbt(["test"]) # expect passing test
assert len(results) == 1

# # # validate that the results include pass
result_statuses = sorted(r.status for r in results)
assert result_statuses == ["pass"]

0 comments on commit e962a58

Please sign in to comment.