Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/staging' into release/1.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphrass committed Aug 21, 2024
2 parents 69293a4 + f6c5db6 commit 19d296b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 36 deletions.
9 changes: 9 additions & 0 deletions butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class HistoricalFeatureStoreWriter(Writer):
improve queries performance. The data is stored in partition folders in AWS S3
based on time (per year, month and day).
>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
... dataframe=dataframe,
... spark_client=spark_client
... merge_on=["id", "timestamp"])
This procedure will skip dataframe write and will activate Delta Merge.
Use it when the table already exist.
"""

PARTITION_BY = [
Expand Down
53 changes: 17 additions & 36 deletions tests/unit/butterfree/transform/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
from butterfree.transform.utils import Function


def create_dataframe(data, timestamp_col="ts"):
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(
TIMESTAMP_COLUMN, df[timestamp_col].cast(DataType.TIMESTAMP.spark)
)
return df


def make_dataframe(spark_context, spark_session):
data = [
{
Expand Down Expand Up @@ -54,11 +63,7 @@ def make_dataframe(spark_context, spark_session):
"nonfeature": 0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_filtering_dataframe(spark_context, spark_session):
Expand All @@ -71,11 +76,7 @@ def make_filtering_dataframe(spark_context, spark_session):
{"id": 1, "ts": 6, "feature1": None, "feature2": None, "feature3": None},
{"id": 1, "ts": 7, "feature1": None, "feature2": None, "feature3": None},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_output_filtering_dataframe(spark_context, spark_session):
Expand All @@ -86,11 +87,7 @@ def make_output_filtering_dataframe(spark_context, spark_session):
{"id": 1, "ts": 4, "feature1": 0, "feature2": 1, "feature3": 1},
{"id": 1, "ts": 6, "feature1": None, "feature2": None, "feature3": None},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_rolling_windows_agg_dataframe(spark_context, spark_session):
Expand Down Expand Up @@ -126,11 +123,7 @@ def make_rolling_windows_agg_dataframe(spark_context, spark_session):
"feature2__avg_over_1_week_rolling_windows": None,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
Expand All @@ -154,11 +147,7 @@ def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
"feature2__avg_over_1_day_rolling_windows": 500.0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_multiple_rolling_windows_hour_slide_agg_dataframe(
Expand Down Expand Up @@ -202,11 +191,7 @@ def make_multiple_rolling_windows_hour_slide_agg_dataframe(
"feature2__avg_over_3_days_rolling_windows": 500.0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
Expand Down Expand Up @@ -331,9 +316,7 @@ def make_fs_dataframe_with_distinct(spark_context, spark_session):
"h3": "86a8100efffffff",
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))
df = create_dataframe(data, "timestamp")

return df

Expand Down Expand Up @@ -361,9 +344,7 @@ def make_target_df_distinct(spark_context, spark_session):
"feature__sum_over_3_days_rolling_windows": None,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))
df = create_dataframe(data, "timestamp")

return df

Expand Down

1 comment on commit 19d296b

@chip-n-dale
Copy link

@chip-n-dale chip-n-dale bot commented on 19d296b Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ralphrass!

The GitLeaks SecTool reported some possibly exposed credentials/secrets, how about giving them a look?

GitLeaks Alert Sync

In case of false positives, more information is available on GitLeaks FAQ
If you have any other problem or question during this process, contact us in the Security space on GChat!

Please sign in to comment.