From 142a29bf6f6be1a94ad40c2a93c1a467811714c7 Mon Sep 17 00:00:00 2001 From: Ralph Filho Date: Fri, 4 Oct 2024 16:32:35 -0300 Subject: [PATCH 1/5] fix: performance adjustments, migrate --- .../database_migration/cassandra_migration.py | 27 +++++++++++++++--- .../transform/aggregated_feature_set.py | 28 +++++++++++-------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/butterfree/migrations/database_migration/cassandra_migration.py b/butterfree/migrations/database_migration/cassandra_migration.py index 5a4f755f..deef6cf5 100644 --- a/butterfree/migrations/database_migration/cassandra_migration.py +++ b/butterfree/migrations/database_migration/cassandra_migration.py @@ -86,11 +86,30 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: Alter column type query. """ - parsed_columns = self._get_parsed_columns([column]) + def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: + """Creates CQL statement to alter columns' types. + In Cassandra 3.4.x to 3.11.x alter type is not allowed. + This method creates a temp column to comply. + + Args: + columns: list of Diff objects with ALTER_TYPE kind. + table_name: table name. + + Returns: + Alter column type query. + + """ + + temp_column_name = f"{column.column}_temp" + + add_temp_column_query = f"ALTER TABLE {table_name} ADD {temp_column_name} {column.value};" + copy_data_to_temp_query = f"UPDATE {table_name} SET {temp_column_name} = {column.column};" + + drop_old_column_query = f"ALTER TABLE {table_name} DROP {column.column};" + rename_temp_column_query = f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};" + + return f"{add_temp_column_query} {copy_data_to_temp_query} {drop_old_column_query} {rename_temp_column_query};" - return ( - f"ALTER TABLE {table_name} ALTER {parsed_columns.replace(' ', ' TYPE ')};" - ) @staticmethod def _get_create_table_query(columns: List[Dict[str, Any]], table_name: str) -> str: diff --git a/butterfree/transform/aggregated_feature_set.py b/butterfree/transform/aggregated_feature_set.py index 9f55ae93..813b73ce 100644 --- a/butterfree/transform/aggregated_feature_set.py +++ b/butterfree/transform/aggregated_feature_set.py @@ -576,14 +576,17 @@ def construct( pre_hook_df = self.run_pre_hooks(dataframe) - output_df = reduce( - lambda df, feature: feature.transform(df), - self.keys + [self.timestamp], - pre_hook_df, + # Apply transformations + for feature in self.keys + [self.timestamp]: + output_df = feature.transform(pre_hook_df) + + # Early filter data + output_df = self.incremental_strategy.filter_with_incremental_strategy( + dataframe=output_df, start_date=start_date, end_date=end_date ) if self._windows and end_date is not None: - # run aggregations for each window + # Run aggregations for each window agg_list = [ self._aggregate( dataframe=output_df, @@ -603,13 +606,12 @@ def construct( # keeping this logic to maintain the same behavior for already implemented # feature sets - if self._windows[0].slide == "1 day": base_df = self._get_base_dataframe( client=client, dataframe=output_df, end_date=end_date ) - # left join each aggregation result to our base dataframe + # Left join each aggregation result to our base dataframe output_df = reduce( lambda left, right: self._dataframe_join( left, @@ -635,19 +637,21 @@ def construct( else: output_df = self._aggregate(output_df, features=self.features) - output_df = self.incremental_strategy.filter_with_incremental_strategy( - dataframe=output_df, start_date=start_date, end_date=end_date - ) - output_df = output_df.select(*self.columns).replace( # type: ignore float("nan"), None ) + if not output_df.isStreaming and self.deduplicate_rows: output_df = self._filter_duplicated_rows(output_df) post_hook_df = self.run_post_hooks(output_df) + # Eager evaluation, only if needed and managable if not output_df.isStreaming and self.eager_evaluation: - post_hook_df.cache().count() + # Small dataframes only + if output_df.count() < 1_000_000: + post_hook_df.cache().count() + else: + post_hook_df.cache() # Cache without materialization for large volumes return post_hook_df From 32df079f718764ffbcf1cfddbb83ff7ef8dad92a Mon Sep 17 00:00:00 2001 From: Ralph Filho Date: Fri, 4 Oct 2024 17:06:03 -0300 Subject: [PATCH 2/5] fix: lint --- .../database_migration/cassandra_migration.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/butterfree/migrations/database_migration/cassandra_migration.py b/butterfree/migrations/database_migration/cassandra_migration.py index deef6cf5..9b804721 100644 --- a/butterfree/migrations/database_migration/cassandra_migration.py +++ b/butterfree/migrations/database_migration/cassandra_migration.py @@ -86,6 +86,7 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: Alter column type query. """ + def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: """Creates CQL statement to alter columns' types. In Cassandra 3.4.x to 3.11.x alter type is not allowed. @@ -102,15 +103,20 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: temp_column_name = f"{column.column}_temp" - add_temp_column_query = f"ALTER TABLE {table_name} ADD {temp_column_name} {column.value};" - copy_data_to_temp_query = f"UPDATE {table_name} SET {temp_column_name} = {column.column};" + add_temp_column_query = ( + f"ALTER TABLE {table_name} ADD {temp_column_name} {column.value};" + ) + copy_data_to_temp_query = ( + f"UPDATE {table_name} SET {temp_column_name} = {column.column};" + ) drop_old_column_query = f"ALTER TABLE {table_name} DROP {column.column};" - rename_temp_column_query = f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};" + rename_temp_column_query = ( + f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};" + ) return f"{add_temp_column_query} {copy_data_to_temp_query} {drop_old_column_query} {rename_temp_column_query};" - @staticmethod def _get_create_table_query(columns: List[Dict[str, Any]], table_name: str) -> str: """Creates CQL statement to create a table. From 5abc81eef5b48dcda6a82d8547870581eff9a91e Mon Sep 17 00:00:00 2001 From: Ralph Filho Date: Fri, 4 Oct 2024 17:25:21 -0300 Subject: [PATCH 3/5] fix: lint --- .../database_migration/cassandra_migration.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/butterfree/migrations/database_migration/cassandra_migration.py b/butterfree/migrations/database_migration/cassandra_migration.py index 9b804721..40bb5c55 100644 --- a/butterfree/migrations/database_migration/cassandra_migration.py +++ b/butterfree/migrations/database_migration/cassandra_migration.py @@ -78,17 +78,6 @@ def _get_alter_table_add_query(self, columns: List[Diff], table_name: str) -> st def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: """Creates CQL statement to alter columns' types. - Args: - columns: list of Diff objects with ALTER_TYPE kind. - table_name: table name. - - Returns: - Alter column type query. - - """ - - def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: - """Creates CQL statement to alter columns' types. In Cassandra 3.4.x to 3.11.x alter type is not allowed. This method creates a temp column to comply. @@ -100,7 +89,6 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: Alter column type query. """ - temp_column_name = f"{column.column}_temp" add_temp_column_query = ( @@ -115,7 +103,8 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};" ) - return f"{add_temp_column_query} {copy_data_to_temp_query} {drop_old_column_query} {rename_temp_column_query};" + return (f"{add_temp_column_query} {copy_data_to_temp_query} " + f"{drop_old_column_query} {rename_temp_column_query};") @staticmethod def _get_create_table_query(columns: List[Dict[str, Any]], table_name: str) -> str: From 8c865c6e1d632093cb7e7185192001528515cbbb Mon Sep 17 00:00:00 2001 From: Ralph Filho Date: Fri, 4 Oct 2024 17:31:05 -0300 Subject: [PATCH 4/5] fix: lint --- .../migrations/database_migration/cassandra_migration.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/butterfree/migrations/database_migration/cassandra_migration.py b/butterfree/migrations/database_migration/cassandra_migration.py index 40bb5c55..4d50746c 100644 --- a/butterfree/migrations/database_migration/cassandra_migration.py +++ b/butterfree/migrations/database_migration/cassandra_migration.py @@ -103,8 +103,10 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};" ) - return (f"{add_temp_column_query} {copy_data_to_temp_query} " - f"{drop_old_column_query} {rename_temp_column_query};") + return ( + f"{add_temp_column_query} {copy_data_to_temp_query} " + f"{drop_old_column_query} {rename_temp_column_query};" + ) @staticmethod def _get_create_table_query(columns: List[Dict[str, Any]], table_name: str) -> str: From cafe64dd6917990c5ac7a4840e95120c8c6befa2 Mon Sep 17 00:00:00 2001 From: Ralph Filho Date: Mon, 7 Oct 2024 21:10:12 -0300 Subject: [PATCH 5/5] fix: transformations --- butterfree/transform/aggregated_feature_set.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/butterfree/transform/aggregated_feature_set.py b/butterfree/transform/aggregated_feature_set.py index 813b73ce..516b6fed 100644 --- a/butterfree/transform/aggregated_feature_set.py +++ b/butterfree/transform/aggregated_feature_set.py @@ -576,11 +576,10 @@ def construct( pre_hook_df = self.run_pre_hooks(dataframe) - # Apply transformations + output_df = pre_hook_df for feature in self.keys + [self.timestamp]: - output_df = feature.transform(pre_hook_df) + output_df = feature.transform(output_df) - # Early filter data output_df = self.incremental_strategy.filter_with_incremental_strategy( dataframe=output_df, start_date=start_date, end_date=end_date )