Skip to content

Commit

Permalink
fix: deduplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphrass committed Aug 22, 2024
1 parent 19d296b commit e5305f3
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 162 deletions.
2 changes: 1 addition & 1 deletion butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def validate(
if self.interval_mode and not self.debug_mode
else spark_client.read_table(table_name).count()
)

dataframe_count = dataframe.count()

self._assert_validation_count(table_name, written_count, dataframe_count)
Expand Down
298 changes: 137 additions & 161 deletions tests/unit/butterfree/transform/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
from unittest.mock import Mock

import pyspark.pandas as ps
from pyspark.sql import functions
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType
from pytest import fixture

from butterfree.constants import DataType
Expand All @@ -25,6 +28,74 @@ def create_dataframe(data, timestamp_col="ts"):
return df


def create_dataframe_from_data(
spark_context, spark_session, data, timestamp_col="timestamp", use_json=False
):
if use_json:
df = spark_session.read.json(
spark_context.parallelize(data).map(lambda x: json.dumps(x))
)
else:
df = create_dataframe(data, timestamp_col=timestamp_col)

df = df.withColumn(timestamp_col, col(timestamp_col).cast(TimestampType()))
return df


def create_rolling_windows_agg_dataframe(
spark_context, spark_session, data, timestamp_col="timestamp", use_json=False
):
if use_json:
df = spark_session.read.json(
spark_context.parallelize(data).map(lambda x: json.dumps(x))
)
df = df.withColumn(
timestamp_col, col(timestamp_col).cast(DataType.TIMESTAMP.spark)
)
else:
df = create_dataframe(data, timestamp_col=timestamp_col)

return df


def build_data(rows, base_features, dynamic_features=None):
"""
Constrói uma lista de dicionários para DataFrame com recursos dinâmicos.
:param rows: Lista de tuplas com (id, timestamp, base_values, dynamic_values).
:param base_features: Lista de nomes de recursos base (strings).
:param dynamic_features: Lista de nomes de recursos dinâmicos,
mapeando para o índice de dynamic_values (opcional).
:return: Lista de dicionários para criação do DataFrame.
"""
data = []
for row in rows:
id_value, timestamp_value, base_values, dynamic_values = row

entry = {
"id": id_value,
"timestamp": timestamp_value,
}

# Adiciona valores das features base
entry.update(
{feature: value for feature, value in zip(base_features, base_values)}
)

# Adiciona valores das features dinâmicas, se houver
if dynamic_features:
entry.update(
{
feature: dynamic_values[idx]
for idx, feature in enumerate(dynamic_features)
}
)

data.append(entry)

return data


def make_dataframe(spark_context, spark_session):
data = [
{
Expand Down Expand Up @@ -91,186 +162,91 @@ def make_output_filtering_dataframe(spark_context, spark_session):


def make_rolling_windows_agg_dataframe(spark_context, spark_session):
data = [
{
"id": 1,
"timestamp": "2016-04-11 00:00:00",
"feature1__avg_over_1_week_rolling_windows": None,
"feature2__avg_over_1_week_rolling_windows": None,
},
{
"id": 1,
"timestamp": "2016-04-12 00:00:00",
"feature1__avg_over_1_week_rolling_windows": 300.0,
"feature2__avg_over_1_week_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-19 00:00:00",
"feature1__avg_over_1_week_rolling_windows": None,
"feature2__avg_over_1_week_rolling_windows": None,
},
{
"id": 1,
"timestamp": "2016-04-23 00:00:00",
"feature1__avg_over_1_week_rolling_windows": 1000.0,
"feature2__avg_over_1_week_rolling_windows": 1100.0,
},
{
"id": 1,
"timestamp": "2016-04-30 00:00:00",
"feature1__avg_over_1_week_rolling_windows": None,
"feature2__avg_over_1_week_rolling_windows": None,
},
rows = [
(1, "2016-04-11 00:00:00", [None, None], None),
(1, "2016-04-12 00:00:00", [300.0, 350.0], None),
(1, "2016-04-19 00:00:00", [None, None], None),
(1, "2016-04-23 00:00:00", [1000.0, 1100.0], None),
(1, "2016-04-30 00:00:00", [None, None], None),
]

base_features = [
"feature1__avg_over_1_week_rolling_windows",
"feature2__avg_over_1_week_rolling_windows",
]
return create_dataframe(data, timestamp_col="timestamp")

data = build_data(rows, base_features)
return create_dataframe_from_data(spark_context, spark_session, data)


def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
data = [
{
"id": 1,
"timestamp": "2016-04-11 12:00:00",
"feature1__avg_over_1_day_rolling_windows": 266.6666666666667,
"feature2__avg_over_1_day_rolling_windows": 300.0,
},
{
"id": 1,
"timestamp": "2016-04-12 00:00:00",
"feature1__avg_over_1_day_rolling_windows": 300.0,
"feature2__avg_over_1_day_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-12 12:00:00",
"feature1__avg_over_1_day_rolling_windows": 400.0,
"feature2__avg_over_1_day_rolling_windows": 500.0,
},
rows = [
(1, "2016-04-11 12:00:00", [266.6666666666667, 300.0], None),
(1, "2016-04-12 00:00:00", [300.0, 350.0], None),
(1, "2016-04-12 12:00:00", [400.0, 500.0], None),
]

base_features = [
"feature1__avg_over_1_day_rolling_windows",
"feature2__avg_over_1_day_rolling_windows",
]
return create_dataframe(data, timestamp_col="timestamp")

data = build_data(rows, base_features)
return create_dataframe_from_data(spark_context, spark_session, data)


def make_multiple_rolling_windows_hour_slide_agg_dataframe(
spark_context, spark_session
):
data = [
{
"id": 1,
"timestamp": "2016-04-11 12:00:00",
"feature1__avg_over_2_days_rolling_windows": 266.6666666666667,
"feature1__avg_over_3_days_rolling_windows": 266.6666666666667,
"feature2__avg_over_2_days_rolling_windows": 300.0,
"feature2__avg_over_3_days_rolling_windows": 300.0,
},
{
"id": 1,
"timestamp": "2016-04-12 00:00:00",
"feature1__avg_over_2_days_rolling_windows": 300.0,
"feature1__avg_over_3_days_rolling_windows": 300.0,
"feature2__avg_over_2_days_rolling_windows": 350.0,
"feature2__avg_over_3_days_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-13 12:00:00",
"feature1__avg_over_2_days_rolling_windows": 400.0,
"feature1__avg_over_3_days_rolling_windows": 300.0,
"feature2__avg_over_2_days_rolling_windows": 500.0,
"feature2__avg_over_3_days_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-14 00:00:00",
"feature1__avg_over_3_days_rolling_windows": 300.0,
"feature2__avg_over_3_days_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-14 12:00:00",
"feature1__avg_over_3_days_rolling_windows": 400.0,
"feature2__avg_over_3_days_rolling_windows": 500.0,
},
rows = [
(
1,
"2016-04-11 12:00:00",
[],
[266.6666666666667, 266.6666666666667, 300.0, 300.0],
),
(1, "2016-04-12 00:00:00", [], [300.0, 300.0, 350.0, 350.0]),
(1, "2016-04-13 12:00:00", [], [400.0, 300.0, 500.0, 350.0]),
(1, "2016-04-14 00:00:00", [], [None, 300.0, None, 350.0]),
(1, "2016-04-14 12:00:00", [], [None, 400.0, None, 500.0]),
]
return create_dataframe(data, timestamp_col="timestamp")


def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
data = [
{
"id": 1,
"timestamp": "2016-04-11 12:00:00",
"feature1__avg_over_1_day_rolling_windows": 266.6666666666667,
"feature2__avg_over_1_day_rolling_windows": 300.0,
},
{
"id": 1,
"timestamp": "2016-04-12 00:00:00",
"feature1__avg_over_1_day_rolling_windows": 300.0,
"feature2__avg_over_1_day_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-12 12:00:00",
"feature1__avg_over_1_day_rolling_windows": 400.0,
"feature2__avg_over_1_day_rolling_windows": 500.0,
},
dynamic_features = [
"feature1__avg_over_2_days_rolling_windows",
"feature1__avg_over_3_days_rolling_windows",
"feature2__avg_over_2_days_rolling_windows",
"feature2__avg_over_3_days_rolling_windows",
]
df = spark_session.read.json(
spark_context.parallelize(data).map(lambda x: json.dumps(x))
)
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
data = build_data(rows, [], dynamic_features=dynamic_features)
return create_dataframe_from_data(spark_context, spark_session, data, use_json=True)


def make_multiple_rolling_windows_hour_slide_agg_dataframe(
spark_context, spark_session
def create_rolling_window_dataframe(
spark_context, spark_session, rows, base_features, dynamic_features=None
):
data = [
{
"id": 1,
"timestamp": "2016-04-11 12:00:00",
"feature1__avg_over_2_days_rolling_windows": 266.6666666666667,
"feature1__avg_over_3_days_rolling_windows": 266.6666666666667,
"feature2__avg_over_2_days_rolling_windows": 300.0,
"feature2__avg_over_3_days_rolling_windows": 300.0,
},
{
"id": 1,
"timestamp": "2016-04-12 00:00:00",
"feature1__avg_over_2_days_rolling_windows": 300.0,
"feature1__avg_over_3_days_rolling_windows": 300.0,
"feature2__avg_over_2_days_rolling_windows": 350.0,
"feature2__avg_over_3_days_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-13 12:00:00",
"feature1__avg_over_2_days_rolling_windows": 400.0,
"feature1__avg_over_3_days_rolling_windows": 300.0,
"feature2__avg_over_2_days_rolling_windows": 500.0,
"feature2__avg_over_3_days_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-14 00:00:00",
"feature1__avg_over_3_days_rolling_windows": 300.0,
"feature2__avg_over_3_days_rolling_windows": 350.0,
},
{
"id": 1,
"timestamp": "2016-04-14 12:00:00",
"feature1__avg_over_3_days_rolling_windows": 400.0,
"feature2__avg_over_3_days_rolling_windows": 500.0,
},
]
df = spark_session.read.json(
spark_context.parallelize(data).map(lambda x: json.dumps(x))
)
"""
Cria um DataFrame com recursos de rolagem de janelas agregadas.
:param spark_context: Contexto do Spark.
:param spark_session: Sessão do Spark.
:param rows: Lista de tuplas com (id, timestamp, base_values, dynamic_values).
:param base_features: Lista de nomes de recursos base (strings).
:param dynamic_features: Lista de nomes de recursos dinâmicos,
mapeando para o índice de dynamic_values (opcional).
:return: DataFrame do Spark.
"""
data = build_data(rows, base_features, dynamic_features)

# Converte a lista de dicionários em um RDD do Spark
rdd = spark_context.parallelize(data).map(lambda x: json.dumps(x))

# Cria o DataFrame do Spark a partir do RDD
df = spark_session.read.json(rdd)

# Converte a coluna "timestamp" para o tipo TIMESTAMP
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df


def make_fs(spark_context, spark_session):
df = make_dataframe(spark_context, spark_session)
Expand Down

1 comment on commit e5305f3

@chip-n-dale
Copy link

@chip-n-dale chip-n-dale bot commented on e5305f3 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ralphrass!

The GitLeaks SecTool reported some possibly exposed credentials/secrets, how about giving them a look?

GitLeaks Alert Sync

In case of false positives, more information is available on GitLeaks FAQ
If you have any other problem or question during this process, contact us in the Security space on GChat!

Please sign in to comment.