Skip to content

Commit

Permalink
[MLOP-248] HistoricaFeatureStoreWriter validation count threshold (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelleinio committed Apr 24, 2020
1 parent 61f0ab0 commit 05dec00
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each
### Added
* [MLOP-191] AggregatedTransform with filter option to use subset during aggregation ([#139](https://github.com/quintoandar/butterfree/pull/139))

### Changed
* [MLOP-248] HistoricaFeatureStoreWriter validation count threshold ([#140](https://github.com/quintoandar/butterfree/pull/140))

## [0.8.0](https://github.com/quintoandar/butterfree/releases/tag/0.8.0)
### Changed
* [PROPOSAL] Optimizing rolling window aggregations ([#134](https://github.com/quintoandar/butterfree/pull/134))
Expand Down
36 changes: 28 additions & 8 deletions butterfree/core/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class HistoricalFeatureStoreWriter(Writer):
database: database to use in Spark metastore.
By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.
num_partitions: value to use in repartition df before save.
validation_threshold: lower and upper tolerance to using in count validation.
The default value is defined in DEFAULT_VALIDATION_THRESHOLD property.
For example: with a validation_threshold = 0.01 and a given calculated
count on the dataframe equal to 100000 records, if the feature store
return a count equal to 995000 an error will not be thrown .
Use validation_threshold = 0 to not use tolerance in the validation.
Example:
Simple example regarding HistoricalFeatureStoreWriter class instantiation.
Expand Down Expand Up @@ -74,12 +80,21 @@ class HistoricalFeatureStoreWriter(Writer):
columns.PARTITION_DAY,
]

def __init__(self, db_config=None, database=None, num_partitions=None):
DEFAULT_VALIDATION_THRESHOLD = 0.01

def __init__(
self,
db_config=None,
database=None,
num_partitions=None,
validation_threshold: float = DEFAULT_VALIDATION_THRESHOLD,
):
self.db_config = db_config or S3Config()
self.database = database or environment.get_variable(
"FEATURE_STORE_HISTORICAL_DATABASE"
)
self.num_partitions = num_partitions or DEFAULT_NUM_PARTITIONS
self.validation_threshold = validation_threshold

def write(
self, feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient
Expand All @@ -103,6 +118,17 @@ def write(
**self.db_config.get_options(s3_key),
)

def _assert_validation_count(self, table_name, written_count, dataframe_count):
lower_bound = (1 - self.validation_threshold) * written_count
upper_bound = (1 + self.validation_threshold) * written_count
validation = lower_bound <= dataframe_count <= upper_bound
assert validation, (
"Data written to the Historical Feature Store and read back "
f"from {table_name} has a different count than the feature set dataframe. "
f"\nNumber of rows in {table_name}: {written_count}."
f"\nNumber of rows in the dataframe: {dataframe_count}."
)

def validate(
self, feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient
):
Expand All @@ -124,13 +150,7 @@ def validate(

written_count = spark_client.sql(query=query_count).collect().pop()["row"]
dataframe_count = dataframe.count()

assert written_count == dataframe_count, (
"Data written to the Historical Feature Store and read back "
f"from {table_name} has a different count than the feature set dataframe. "
f"\nNumber of rows in {table_name}: {written_count}."
f"\nNumber of rows in the dataframe: {dataframe_count}."
)
self._assert_validation_count(table_name, written_count, dataframe_count)

def _create_partitions(self, dataframe):
# create year partition column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,36 @@ def test__repartition_df(self, spark_session, spark_context):
assert input_df.select(spark_partition_id()).distinct().count() == 1
# Desired number of partitions
assert result_df.select(spark_partition_id()).distinct().count() == 200

@pytest.mark.parametrize(
"written_count, dataframe_count, threshold",
[(100, 101, None), (100, 99, None), (100, 108, 0.10), (100, 92, 0.10)],
)
def test__assert_validation_count(self, written_count, dataframe_count, threshold):
# arrange
writer = (
HistoricalFeatureStoreWriter(validation_threshold=threshold)
if threshold
else HistoricalFeatureStoreWriter()
)

# act and assert
writer._assert_validation_count("table", written_count, dataframe_count)

@pytest.mark.parametrize(
"written_count, dataframe_count, threshold",
[(100, 102, None), (100, 98, None), (100, 111, 0.10), (100, 88, 0.10)],
)
def test__assert_validation_count_error(
self, written_count, dataframe_count, threshold
):
# arrange
writer = (
HistoricalFeatureStoreWriter(validation_threshold=threshold)
if threshold
else HistoricalFeatureStoreWriter()
)

# act and assert
with pytest.raises(AssertionError):
writer._assert_validation_count("table", written_count, dataframe_count)

0 comments on commit 05dec00

Please sign in to comment.