Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.4.0 #372

Merged
merged 72 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
fc41ab9
[MLOP-634] Butterfree dev workflow, set triggers for branches staging…
moromimay Feb 8, 2021
4be4ffe
[BUG] Fix Staging GithubActions Pipeline (#283)
moromimay Feb 8, 2021
a3a601b
Apply only wheel. (#285)
moromimay Feb 8, 2021
4339608
[BUG] Change version on setup.py to PyPI (#286)
moromimay Feb 9, 2021
a82433c
Keep milliseconds when using 'from_ms' argument in timestamp feature …
hmeretti Feb 9, 2021
dcbf540
Change trigger for pipeline staging (#287)
moromimay Feb 10, 2021
a0a9335
Create a dev package. (#288)
moromimay Feb 10, 2021
7427898
[MLOP-633] Butterfree dev workflow, update documentation (#281)
moromimay Feb 10, 2021
245eaa5
[MLOP-632] Butterfree dev workflow, automate release description (#279)
AlvaroMarquesAndrade Feb 11, 2021
d6ecfa4
[MLOP-636] Create migration classes (#282)
AlvaroMarquesAndrade Feb 18, 2021
32e24d6
[MLOP-635] Rebase Incremental Job/Interval Run branch for test on sel…
moromimay Feb 19, 2021
8da89ed
Allow slide selection (#293)
roelschr Feb 22, 2021
0df07ae
Fix Slide Duration Typo (#295)
AlvaroMarquesAndrade Feb 26, 2021
aeb7999
[MLOP-637] Implement diff method (#292)
moromimay Mar 8, 2021
9afc39c
[MLOP-640] Create CLI with migrate command (#298)
roelschr Mar 15, 2021
bf204f2
[MLOP-645] Implement query method, cassandra (#291)
AlvaroMarquesAndrade Mar 15, 2021
b518dbc
[MLOP-671] Implement get_schema on Spark client (#301)
AlvaroMarquesAndrade Mar 16, 2021
5fe4c40
[MLOP-648] Implement query method, metastore (#294)
AlvaroMarquesAndrade Mar 16, 2021
e8fc0da
Fix Validation Step (#302)
AlvaroMarquesAndrade Mar 22, 2021
3d93a09
[MLOP-647] [MLOP-646] Apply migrations (#300)
roelschr Mar 23, 2021
0d30932
[BUG] Apply create_partitions to historical validate (#303)
moromimay Mar 30, 2021
d607297
[BUG] Fix key path for validate read (#304)
moromimay Mar 30, 2021
3dcd975
[FIX] Add Partition types for Metastore (#305)
AlvaroMarquesAndrade Apr 1, 2021
8077d86
[MLOP-639] Track logs in S3 (#306)
moromimay Apr 1, 2021
6d2a8f9
[BUG] Change logging config (#307)
moromimay Apr 6, 2021
d2c5d39
Change solution for tracking logs (#308)
moromimay Apr 8, 2021
43392f4
Read and write consistency level options (#309)
github-felipe-caputo Apr 13, 2021
0f31164
Fix kafka reader. (#310)
moromimay Apr 14, 2021
e6f67e9
Fix path validate. (#311)
moromimay Apr 14, 2021
baa594b
Add local dc property (#312)
github-felipe-caputo Apr 16, 2021
a74f098
Remove metastore migrate (#313)
moromimay Apr 20, 2021
378f3a5
Fix link in our docs. (#315)
moromimay Apr 20, 2021
3b18b5a
[BUG] Fix Cassandra Connect Session (#316)
moromimay Apr 23, 2021
c46f171
Fix migration query. (#318)
moromimay Apr 26, 2021
bb124f5
Fix migration query add type key. (#319)
moromimay Apr 28, 2021
1c97316
Fix db-config condition (#321)
moromimay May 5, 2021
bb7ed77
MLOP-642 Document migration in Butterfree (#320)
roelschr May 7, 2021
5a0a622
[MLOP-702] Debug mode for Automate Migration (#322)
moromimay May 10, 2021
b1371f1
[MLOP-727] Improve logging messages (#325)
GaBrandao Jun 2, 2021
acf7022
[MLOP-728] Improve logging messages (#324)
moromimay Jun 2, 2021
d0bf61a
Fix method to generate agg feature name. (#326)
moromimay Jun 4, 2021
1cf0dbd
[MLOP-691] Include step to add partition to SparkMetastore during wr…
moromimay Jun 10, 2021
9f42f53
Add the missing link for H3 geohash (#330)
jdvala Jun 16, 2021
78927e3
Update README.md (#331)
Jul 30, 2021
43bb3a3
Update Github Actions Workflow runner (#332)
Aug 22, 2022
2593839
Delete sphinx version. (#334)
moromimay Dec 20, 2022
35bcd30
Update files to staging (#336)
moromimay Dec 21, 2022
3a73ed8
Revert "Update files to staging (#336)" (#337)
moromimay Jan 2, 2023
6b78a50
Less strict requirements (#333)
lecardozo Aug 16, 2023
2a19009
feat: optional row count validation (#340)
ralphrass Aug 18, 2023
ca1a16d
fix: parameter, libs (#341)
ralphrass Aug 18, 2023
60c7ee4
pre-release 1.2.2.dev0 (#342)
ralphrass Aug 21, 2023
f35d665
Rebase staging (#343)
ralphrass Aug 21, 2023
97e44fa
Rebase staging from master (#345)
ralphrass Aug 21, 2023
9bcca0e
feat(MLOP-1985): optional params (#347)
ralphrass Nov 13, 2023
512a0fe
pre-release 1.2.3 (#349)
ralphrass Nov 13, 2023
688a5b3
feat(MLOP-2145): add feature set creation script (#351)
ralphrass Apr 11, 2024
da91b49
Rebase staging from master (#354)
ralphrass Apr 25, 2024
887fbb2
feat(mlop-2269): bump versions (#355)
ralphrass May 29, 2024
5af8a05
fix: sphinx version (#356)
ralphrass Jun 3, 2024
cbda73d
fix: publish and dev versions (#359)
ralphrass Jun 7, 2024
2a5a6e8
feat(MLOP-2236): add NTZ (#360)
ralphrass Jun 14, 2024
6363e03
fix: cassandra configs (#364)
ralphrass Jun 20, 2024
81c2c17
fix: Cassandra config keys (#366)
ralphrass Jun 28, 2024
b1949cd
fix: new type (#368)
ralphrass Jun 28, 2024
12d5e98
Delete .checklist.yaml (#371)
fernandrone Aug 16, 2024
35dd929
Add Delta support (#370)
ralphrass Aug 19, 2024
77539ae
release 1.4.0
ralphrass Aug 20, 2024
69293a4
Merge branch 'master' into release/1.4.0
ralphrass Aug 21, 2024
f6c5db6
Fix dup code (#373)
ralphrass Aug 21, 2024
19d296b
Merge remote-tracking branch 'origin/staging' into release/1.4.0
ralphrass Aug 21, 2024
e5305f3
fix: deduplicate
ralphrass Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ on:
paths:
- 'setup.py'


jobs:
Pipeline:
if: github.ref == 'refs/heads/master'

runs-on: ubuntu-latest

steps:
Expand All @@ -19,7 +17,7 @@ jobs:

- uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: microsoft

- uses: vemonet/setup-spark@v1
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ on:
jobs:
Pipeline:
if: github.ref == 'refs/heads/staging'

runs-on: ubuntu-latest

steps:
Expand All @@ -18,7 +17,7 @@ jobs:

- uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: microsoft

- uses: vemonet/setup-spark@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: microsoft

- uses: vemonet/setup-spark@v1
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each

## [Unreleased]

## [1.4.0](https://github.com/quintoandar/butterfree/releases/tag/1.4.0)
* Add Delta support ([#370](https://github.com/quintoandar/butterfree/pull/370))

## [1.3.5](https://github.com/quintoandar/butterfree/releases/tag/1.3.5)
* Auto create feature sets ([#368](https://github.com/quintoandar/butterfree/pull/368))

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ minimum-requirements:

.PHONY: requirements
## install all requirements
requirements: requirements-test requirements-lint dev-requirements minimum-requirements
requirements: minimum-requirements dev-requirements requirements-test requirements-lint

.PHONY: ci-install
ci-install:
Expand Down
1 change: 1 addition & 0 deletions butterfree/clients/spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def conn(self) -> SparkSession:
"""
if not self._session:
self._session = SparkSession.builder.getOrCreate()

return self._session

def read(
Expand Down
3 changes: 2 additions & 1 deletion butterfree/load/writers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Holds data loaders for historical and online feature store."""

from butterfree.load.writers.delta_writer import DeltaWriter
from butterfree.load.writers.historical_feature_store_writer import (
HistoricalFeatureStoreWriter,
)
from butterfree.load.writers.online_feature_store_writer import OnlineFeatureStoreWriter

__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter"]
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter", "DeltaWriter"]
162 changes: 162 additions & 0 deletions butterfree/load/writers/delta_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from delta.tables import DeltaTable
from pyspark.sql.dataframe import DataFrame

from butterfree.clients import SparkClient
from butterfree.configs.logger import __logger

logger = __logger("delta_writer", True)


class DeltaWriter:
"""Control operations on Delta Tables.

Resposible for merging and optimizing.
"""

@staticmethod
def _get_full_table_name(table, database):
if database:
return "{}.{}".format(database, table)
else:
return table

@staticmethod
def _convert_to_delta(client: SparkClient, table: str):
logger.info(f"Converting {table} to Delta...")
client.conn.sql(f"CONVERT TO DELTA {table}")
logger.info("Conversion done.")

@staticmethod
def merge(
client: SparkClient,
database: str,
table: str,
merge_on: list,
source_df: DataFrame,
when_not_matched_insert_condition: str = None,
when_matched_update_condition: str = None,
when_matched_delete_condition: str = None,
):
"""
Merge a source dataframe to a Delta table.

By default, it will update when matched, and insert when
not matched (simple upsert).

You can change this behavior by setting:
- when_not_matched_insert_condition: it will only insert
when this specified condition is true
- when_matched_update_condition: it will only update when this
specified condition is true. You can refer to the columns
in the source dataframe as source.<column_name>, and the columns
in the target table as target.<column_name>.
- when_matched_delete_condition: it will add an operation to delete,
but only if this condition is true. Again, source and
target dataframe columns can be referred to respectively as
source.<column_name> and target.<column_name>
"""
try:
full_table_name = DeltaWriter._get_full_table_name(table, database)

table_exists = client.conn.catalog.tableExists(full_table_name)

if table_exists:
pd_df = client.conn.sql(
f"DESCRIBE TABLE EXTENDED {full_table_name}"
).toPandas()
provider = (
pd_df.reset_index()
.groupby(["col_name"])["data_type"]
.aggregate("first")
.Provider
)
table_is_delta = provider.lower() == "delta"

if not table_is_delta:
DeltaWriter()._convert_to_delta(client, full_table_name)

# For schema evolution
client.conn.conf.set(
"spark.databricks.delta.schema.autoMerge.enabled", "true"
)

target_table = DeltaTable.forName(client.conn, full_table_name)
join_condition = " AND ".join(
[f"source.{col} = target.{col}" for col in merge_on]
)
merge_builder = target_table.alias("target").merge(
source_df.alias("source"), join_condition
)
if when_matched_delete_condition:
merge_builder = merge_builder.whenMatchedDelete(
condition=when_matched_delete_condition
)

merge_builder.whenMatchedUpdateAll(
condition=when_matched_update_condition
).whenNotMatchedInsertAll(
condition=when_not_matched_insert_condition
).execute()
except Exception as e:
logger.error(f"Merge operation on {full_table_name} failed: {e}")

@staticmethod
def vacuum(table: str, retention_hours: int, client: SparkClient):
"""Vacuum a Delta table.

Vacuum remove unused files (files not managed by Delta + files
that are not in the latest state).
After vacuum it's impossible to time travel to versions
older than the `retention` time.
Default retention is 7 days. Lower retentions will be warned,
unless it's set to false.
Set spark.databricks.delta.retentionDurationCheck.enabled
to false for low retentions.
https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html
"""

command = f"VACUUM {table} RETAIN {retention_hours} HOURS"
logger.info(f"Running vacuum with command {command}")
client.conn.sql(command)
logger.info(f"Vacuum successful for table {table}")

@staticmethod
def optimize(
client: SparkClient,
table: str = None,
z_order: list = None,
date_column: str = "timestamp",
from_date: str = None,
auto_compact: bool = False,
optimize_write: bool = False,
):
"""Optimize a Delta table.

For auto-compaction and optimize write DBR >= 14.3 LTS
and Delta >= 3.1.0 are MANDATORY.
For z-ordering DBR >= 13.3 LTS and Delta >= 2.0.0 are MANDATORY.
Auto-compaction (recommended) reduces the small file problem
(overhead due to lots of metadata).
Z-order by columns that is commonly used in queries
predicates and has a high cardinality.
https://docs.delta.io/latest/optimizations-oss.html
"""

if auto_compact:
client.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

if optimize_write:
client.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")

if table:
command = f"OPTIMIZE {table}"

if from_date:
command += f"WHERE {date_column} >= {from_date}"

if z_order:
command += f" ZORDER BY {','.join(z_order)}"

logger.info(f"Running optimize with command {command}...")
client.conn.sql(command)
logger.info(f"Optimize successful for table {table}.")
36 changes: 29 additions & 7 deletions butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from butterfree.dataframe_service import repartition_df
from butterfree.hooks import Hook
from butterfree.hooks.schema_compatibility import SparkTableSchemaCompatibilityHook
from butterfree.load.writers.delta_writer import DeltaWriter
from butterfree.load.writers.writer import Writer
from butterfree.transform import FeatureSet

Expand Down Expand Up @@ -92,6 +93,15 @@ class HistoricalFeatureStoreWriter(Writer):
improve queries performance. The data is stored in partition folders in AWS S3
based on time (per year, month and day).

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
... dataframe=dataframe,
... spark_client=spark_client
... merge_on=["id", "timestamp"])

This procedure will skip dataframe write and will activate Delta Merge.
Use it when the table already exist.
"""

PARTITION_BY = [
Expand All @@ -114,13 +124,15 @@ def __init__(
interval_mode: bool = False,
check_schema_hook: Optional[Hook] = None,
row_count_validation: bool = True,
merge_on: list = None,
):
super(HistoricalFeatureStoreWriter, self).__init__(
db_config or MetastoreConfig(),
debug_mode,
interval_mode,
False,
row_count_validation,
merge_on,
)
self.database = database or environment.get_variable(
"FEATURE_STORE_HISTORICAL_DATABASE"
Expand All @@ -141,6 +153,7 @@ def write(
feature_set: object processed with feature_set informations.
dataframe: spark dataframe containing data from a feature set.
spark_client: client for spark connections with external services.
merge_on: when filled, the writing is an upsert in a Delta table.

If the debug_mode is set to True, a temporary table with a name in the format:
historical_feature_store__{feature_set.name} will be created instead of writing
Expand Down Expand Up @@ -174,13 +187,22 @@ def write(

s3_key = os.path.join("historical", feature_set.entity, feature_set.name)

spark_client.write_table(
dataframe=dataframe,
database=self.database,
table_name=feature_set.name,
partition_by=self.PARTITION_BY,
**self.db_config.get_options(s3_key),
)
if self.merge_on:
DeltaWriter.merge(
client=spark_client,
database=self.database,
table=feature_set.name,
merge_on=self.merge_on,
source_df=dataframe,
)
else:
spark_client.write_table(
dataframe=dataframe,
database=self.database,
table_name=feature_set.name,
partition_by=self.PARTITION_BY,
**self.db_config.get_options(s3_key),
)

def _assert_validation_count(
self, table_name: str, written_count: int, dataframe_count: int
Expand Down
2 changes: 2 additions & 0 deletions butterfree/load/writers/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
interval_mode: Optional[bool] = False,
write_to_entity: Optional[bool] = False,
row_count_validation: Optional[bool] = True,
merge_on: Optional[list] = None,
) -> None:
super().__init__()
self.db_config = db_config
Expand All @@ -35,6 +36,7 @@ def __init__(
self.interval_mode = interval_mode
self.write_to_entity = write_to_entity
self.row_count_validation = row_count_validation
self.merge_on = merge_on

def with_(
self, transformer: Callable[..., DataFrame], *args: Any, **kwargs: Any
Expand Down
2 changes: 2 additions & 0 deletions docs/source/butterfree.automated.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ butterfree.automated package
Submodules
----------

butterfree.automated.feature\_set\_creation module
--------------------------------------------------

.. automodule:: butterfree.automated.feature_set_creation
:members:
Expand Down
1 change: 0 additions & 1 deletion docs/source/butterfree.constants.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ butterfree.constants.spark\_constants module
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.spark_constants
:members:
:undoc-members:
Expand Down
11 changes: 11 additions & 0 deletions docs/source/butterfree.dataframe_service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,29 @@ butterfree.dataframe\_service package
Submodules
----------

butterfree.dataframe\_service.incremental\_strategy module
----------------------------------------------------------

.. automodule:: butterfree.dataframe_service.incremental_strategy
:members:
:undoc-members:
:show-inheritance:

butterfree.dataframe\_service.partitioning module
-------------------------------------------------

.. automodule:: butterfree.dataframe_service.partitioning
:members:
:undoc-members:
:show-inheritance:

butterfree.dataframe\_service.repartition module
------------------------------------------------

.. automodule:: butterfree.dataframe_service.repartition
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.dataframe_service.repartition
:members:
Expand Down
Loading
Loading