diff --git a/butterfree/load/writers/historical_feature_store_writer.py b/butterfree/load/writers/historical_feature_store_writer.py index d286d6e2..99bfe66a 100644 --- a/butterfree/load/writers/historical_feature_store_writer.py +++ b/butterfree/load/writers/historical_feature_store_writer.py @@ -251,7 +251,7 @@ def validate( if self.interval_mode and not self.debug_mode else spark_client.read_table(table_name).count() ) - + dataframe_count = dataframe.count() self._assert_validation_count(table_name, written_count, dataframe_count) diff --git a/tests/unit/butterfree/transform/conftest.py b/tests/unit/butterfree/transform/conftest.py index 25d2a47b..c0ebb47a 100644 --- a/tests/unit/butterfree/transform/conftest.py +++ b/tests/unit/butterfree/transform/conftest.py @@ -1,7 +1,10 @@ +import json from unittest.mock import Mock import pyspark.pandas as ps from pyspark.sql import functions +from pyspark.sql.functions import col +from pyspark.sql.types import TimestampType from pytest import fixture from butterfree.constants import DataType @@ -25,6 +28,74 @@ def create_dataframe(data, timestamp_col="ts"): return df +def create_dataframe_from_data( + spark_context, spark_session, data, timestamp_col="timestamp", use_json=False +): + if use_json: + df = spark_session.read.json( + spark_context.parallelize(data).map(lambda x: json.dumps(x)) + ) + else: + df = create_dataframe(data, timestamp_col=timestamp_col) + + df = df.withColumn(timestamp_col, col(timestamp_col).cast(TimestampType())) + return df + + +def create_rolling_windows_agg_dataframe( + spark_context, spark_session, data, timestamp_col="timestamp", use_json=False +): + if use_json: + df = spark_session.read.json( + spark_context.parallelize(data).map(lambda x: json.dumps(x)) + ) + df = df.withColumn( + timestamp_col, col(timestamp_col).cast(DataType.TIMESTAMP.spark) + ) + else: + df = create_dataframe(data, timestamp_col=timestamp_col) + + return df + + +def build_data(rows, base_features, dynamic_features=None): + """ + Constrói uma lista de dicionários para DataFrame com recursos dinâmicos. + + :param rows: Lista de tuplas com (id, timestamp, base_values, dynamic_values). + :param base_features: Lista de nomes de recursos base (strings). + :param dynamic_features: Lista de nomes de recursos dinâmicos, + mapeando para o índice de dynamic_values (opcional). + :return: Lista de dicionários para criação do DataFrame. + """ + data = [] + for row in rows: + id_value, timestamp_value, base_values, dynamic_values = row + + entry = { + "id": id_value, + "timestamp": timestamp_value, + } + + # Adiciona valores das features base + entry.update( + {feature: value for feature, value in zip(base_features, base_values)} + ) + + # Adiciona valores das features dinâmicas, se houver + if dynamic_features: + entry.update( + { + feature: dynamic_values[idx] + for idx, feature in enumerate(dynamic_features) + } + ) + + data.append(entry) + + return data + + def make_dataframe(spark_context, spark_session): data = [ { @@ -91,186 +162,91 @@ def make_output_filtering_dataframe(spark_context, spark_session): def make_rolling_windows_agg_dataframe(spark_context, spark_session): - data = [ - { - "id": 1, - "timestamp": "2016-04-11 00:00:00", - "feature1__avg_over_1_week_rolling_windows": None, - "feature2__avg_over_1_week_rolling_windows": None, - }, - { - "id": 1, - "timestamp": "2016-04-12 00:00:00", - "feature1__avg_over_1_week_rolling_windows": 300.0, - "feature2__avg_over_1_week_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-19 00:00:00", - "feature1__avg_over_1_week_rolling_windows": None, - "feature2__avg_over_1_week_rolling_windows": None, - }, - { - "id": 1, - "timestamp": "2016-04-23 00:00:00", - "feature1__avg_over_1_week_rolling_windows": 1000.0, - "feature2__avg_over_1_week_rolling_windows": 1100.0, - }, - { - "id": 1, - "timestamp": "2016-04-30 00:00:00", - "feature1__avg_over_1_week_rolling_windows": None, - "feature2__avg_over_1_week_rolling_windows": None, - }, + rows = [ + (1, "2016-04-11 00:00:00", [None, None], None), + (1, "2016-04-12 00:00:00", [300.0, 350.0], None), + (1, "2016-04-19 00:00:00", [None, None], None), + (1, "2016-04-23 00:00:00", [1000.0, 1100.0], None), + (1, "2016-04-30 00:00:00", [None, None], None), + ] + + base_features = [ + "feature1__avg_over_1_week_rolling_windows", + "feature2__avg_over_1_week_rolling_windows", ] - return create_dataframe(data, timestamp_col="timestamp") + + data = build_data(rows, base_features) + return create_dataframe_from_data(spark_context, spark_session, data) def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session): - data = [ - { - "id": 1, - "timestamp": "2016-04-11 12:00:00", - "feature1__avg_over_1_day_rolling_windows": 266.6666666666667, - "feature2__avg_over_1_day_rolling_windows": 300.0, - }, - { - "id": 1, - "timestamp": "2016-04-12 00:00:00", - "feature1__avg_over_1_day_rolling_windows": 300.0, - "feature2__avg_over_1_day_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-12 12:00:00", - "feature1__avg_over_1_day_rolling_windows": 400.0, - "feature2__avg_over_1_day_rolling_windows": 500.0, - }, + rows = [ + (1, "2016-04-11 12:00:00", [266.6666666666667, 300.0], None), + (1, "2016-04-12 00:00:00", [300.0, 350.0], None), + (1, "2016-04-12 12:00:00", [400.0, 500.0], None), + ] + + base_features = [ + "feature1__avg_over_1_day_rolling_windows", + "feature2__avg_over_1_day_rolling_windows", ] - return create_dataframe(data, timestamp_col="timestamp") + + data = build_data(rows, base_features) + return create_dataframe_from_data(spark_context, spark_session, data) def make_multiple_rolling_windows_hour_slide_agg_dataframe( spark_context, spark_session ): - data = [ - { - "id": 1, - "timestamp": "2016-04-11 12:00:00", - "feature1__avg_over_2_days_rolling_windows": 266.6666666666667, - "feature1__avg_over_3_days_rolling_windows": 266.6666666666667, - "feature2__avg_over_2_days_rolling_windows": 300.0, - "feature2__avg_over_3_days_rolling_windows": 300.0, - }, - { - "id": 1, - "timestamp": "2016-04-12 00:00:00", - "feature1__avg_over_2_days_rolling_windows": 300.0, - "feature1__avg_over_3_days_rolling_windows": 300.0, - "feature2__avg_over_2_days_rolling_windows": 350.0, - "feature2__avg_over_3_days_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-13 12:00:00", - "feature1__avg_over_2_days_rolling_windows": 400.0, - "feature1__avg_over_3_days_rolling_windows": 300.0, - "feature2__avg_over_2_days_rolling_windows": 500.0, - "feature2__avg_over_3_days_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-14 00:00:00", - "feature1__avg_over_3_days_rolling_windows": 300.0, - "feature2__avg_over_3_days_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-14 12:00:00", - "feature1__avg_over_3_days_rolling_windows": 400.0, - "feature2__avg_over_3_days_rolling_windows": 500.0, - }, + rows = [ + ( + 1, + "2016-04-11 12:00:00", + [], + [266.6666666666667, 266.6666666666667, 300.0, 300.0], + ), + (1, "2016-04-12 00:00:00", [], [300.0, 300.0, 350.0, 350.0]), + (1, "2016-04-13 12:00:00", [], [400.0, 300.0, 500.0, 350.0]), + (1, "2016-04-14 00:00:00", [], [None, 300.0, None, 350.0]), + (1, "2016-04-14 12:00:00", [], [None, 400.0, None, 500.0]), ] - return create_dataframe(data, timestamp_col="timestamp") - -def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session): - data = [ - { - "id": 1, - "timestamp": "2016-04-11 12:00:00", - "feature1__avg_over_1_day_rolling_windows": 266.6666666666667, - "feature2__avg_over_1_day_rolling_windows": 300.0, - }, - { - "id": 1, - "timestamp": "2016-04-12 00:00:00", - "feature1__avg_over_1_day_rolling_windows": 300.0, - "feature2__avg_over_1_day_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-12 12:00:00", - "feature1__avg_over_1_day_rolling_windows": 400.0, - "feature2__avg_over_1_day_rolling_windows": 500.0, - }, + dynamic_features = [ + "feature1__avg_over_2_days_rolling_windows", + "feature1__avg_over_3_days_rolling_windows", + "feature2__avg_over_2_days_rolling_windows", + "feature2__avg_over_3_days_rolling_windows", ] - df = spark_session.read.json( - spark_context.parallelize(data).map(lambda x: json.dumps(x)) - ) - df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark)) - return df + data = build_data(rows, [], dynamic_features=dynamic_features) + return create_dataframe_from_data(spark_context, spark_session, data, use_json=True) -def make_multiple_rolling_windows_hour_slide_agg_dataframe( - spark_context, spark_session +def create_rolling_window_dataframe( + spark_context, spark_session, rows, base_features, dynamic_features=None ): - data = [ - { - "id": 1, - "timestamp": "2016-04-11 12:00:00", - "feature1__avg_over_2_days_rolling_windows": 266.6666666666667, - "feature1__avg_over_3_days_rolling_windows": 266.6666666666667, - "feature2__avg_over_2_days_rolling_windows": 300.0, - "feature2__avg_over_3_days_rolling_windows": 300.0, - }, - { - "id": 1, - "timestamp": "2016-04-12 00:00:00", - "feature1__avg_over_2_days_rolling_windows": 300.0, - "feature1__avg_over_3_days_rolling_windows": 300.0, - "feature2__avg_over_2_days_rolling_windows": 350.0, - "feature2__avg_over_3_days_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-13 12:00:00", - "feature1__avg_over_2_days_rolling_windows": 400.0, - "feature1__avg_over_3_days_rolling_windows": 300.0, - "feature2__avg_over_2_days_rolling_windows": 500.0, - "feature2__avg_over_3_days_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-14 00:00:00", - "feature1__avg_over_3_days_rolling_windows": 300.0, - "feature2__avg_over_3_days_rolling_windows": 350.0, - }, - { - "id": 1, - "timestamp": "2016-04-14 12:00:00", - "feature1__avg_over_3_days_rolling_windows": 400.0, - "feature2__avg_over_3_days_rolling_windows": 500.0, - }, - ] - df = spark_session.read.json( - spark_context.parallelize(data).map(lambda x: json.dumps(x)) - ) + """ + Cria um DataFrame com recursos de rolagem de janelas agregadas. + + :param spark_context: Contexto do Spark. + :param spark_session: Sessão do Spark. + :param rows: Lista de tuplas com (id, timestamp, base_values, dynamic_values). + :param base_features: Lista de nomes de recursos base (strings). + :param dynamic_features: Lista de nomes de recursos dinâmicos, + mapeando para o índice de dynamic_values (opcional). + :return: DataFrame do Spark. + """ + data = build_data(rows, base_features, dynamic_features) + + # Converte a lista de dicionários em um RDD do Spark + rdd = spark_context.parallelize(data).map(lambda x: json.dumps(x)) + + # Cria o DataFrame do Spark a partir do RDD + df = spark_session.read.json(rdd) + + # Converte a coluna "timestamp" para o tipo TIMESTAMP df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark)) - return df - def make_fs(spark_context, spark_session): df = make_dataframe(spark_context, spark_session)