Skip to content

Commit

Permalink
Add integration test. TODO: confirm expected behavior of the timestam…
Browse files Browse the repository at this point in the history
…p range and fix issues

Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com>
  • Loading branch information
loomlike committed Dec 3, 2022
1 parent 7eb9f50 commit e7be239
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 75 deletions.
17 changes: 9 additions & 8 deletions feathr_project/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ def workspace_dir() -> str:
return str(Path(__file__).parent.resolve().joinpath("test_user_workspace"))


@pytest.fixture
def mock_data_path(workspace_dir):
return str(Path(workspace_dir).joinpath(
"mockdata",
"feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net",
"demo_data",
"green_tripdata_2020-04.csv",
))
# TODO we can use this later
# @pytest.fixture
# def mock_data_path(workspace_dir):
# return str(Path(workspace_dir).joinpath(
# "mockdata",
# "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net",
# "demo_data",
# "green_tripdata_2020-04.csv",
# ))


@pytest.fixture(scope="function")
Expand Down
142 changes: 75 additions & 67 deletions feathr_project/test/integration/test_settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from datetime import datetime, timedelta
from pathlib import Path
from tempfile import TemporaryDirectory

import pandas as pd
import pytest

from feathr import (
Expand All @@ -15,40 +18,59 @@
from feathr.utils.job_utils import get_result_df


TIMESTAMP_COL = "timestamp"
KEY_COL = "location_id"
FEATURE_COL = "fare"


@pytest.fixture(scope="session")
def mock_df():
"""Mock data for testing.
"""
# TODO Currently we're using "today" since `useLatestFeatureData` uses now().
# Once the behavior is changed to use the latest timestamp in the data, we can use fixed test data instead of creating new one everytime.
today = datetime.now().date()
date_range = list(pd.date_range(start=today-timedelta(days=4), end=today, freq="D"))
return pd.DataFrame({
TIMESTAMP_COL: date_range + date_range,
KEY_COL: [1, 1, 1, 1, 1, 2, 2, 2, 2, 2],
FEATURE_COL: [5.5, 10.0, 6.0, 8.0, 2.5, 38.0, 12.0, 52.0, 180.0, 3.5],
})


@pytest.mark.integration
def test__observation_settings(feathr_client, mock_data_path):
def test__observation_settings(feathr_client, mock_df):
tmp_dir = TemporaryDirectory()

# TODO
mock_data_path = str(Path("mock_data.csv").resolve())
# mock_distance_data_path = str(Path("mock_distance_data.csv").resolve())
mock_data_path = str(Path(tmp_dir.name, "mock_data.csv"))
mock_df.to_csv(str(mock_data_path), index=False)

# Upload data into dbfs or adls
if feathr_client.spark_runtime != "local":
mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path)
# mock_distance_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_distance_data_path)

# Define agg features
source = HdfsSource(
name="source",
path=mock_data_path,
event_timestamp_column="lpep_pickup_datetime",
timestamp_format="yyyyMMdd",
event_timestamp_column=TIMESTAMP_COL,
timestamp_format="yyyy-MM-dd", # yyyy/MM/dd/HH/mm/ss
)
location_id_key = TypedKey(
key_column="DOLocationID",
key = TypedKey(
key_column=KEY_COL,
key_column_type=ValueType.INT32,
description="location id key",
full_name="location_id_key",
description="key",
full_name=KEY_COL,
)
agg_features = [
Feature(
name="f_location_max_fare",
key=location_id_key,
name=f"f_{FEATURE_COL}",
key=key,
feature_type=FLOAT,
transform=WindowAggTransformation(
agg_expr="cast_float(fare_amount)",
agg_expr=f"cast_float({FEATURE_COL})",
agg_func="MAX",
window="20d",
window="2d", # 2 days sliding window
),
),
]
Expand All @@ -58,75 +80,59 @@ def test__observation_settings(feathr_client, mock_data_path):
features=agg_features,
)

# distance_source = HdfsSource(
# name="distance_source",
# path=mock_distance_data_path,
# event_timestamp_column="lpep_pickup_datetime",
# timestamp_format="yyyy-MM-dd HH:mm:ss",
# )
# datetime_key = TypedKey(
# key_column="lpep_pickup_datetime",
# key_column_type=ValueType.INT64,
# description="datetime key",
# full_name="datetime_key",
# )
# features = [
# Feature(
# name="f_fare_amount",
# # name="f_trip_distance",
# # key=datetime_key,
# feature_type=FLOAT,
# transform="fare_amount",
# # transform="trip_distance",
# ),
# ]
# anchor = FeatureAnchor(
# name="anchor",
# source=INPUT_CONTEXT,
# # source=distance_source,
# features=features,
# )

feathr_client.build_features(
anchor_list=[
agg_anchor,
# anchor,
]
)

query = [
FeatureQuery(
feature_list=[
"f_location_max_fare",
# "f_trip_distance",
# "f_fare_amount",
f"f_{FEATURE_COL}",
],
key=location_id_key,
key=key,
),
]

observation_time_range_values = [
AbsoluteTimeRange(
start_time="20210102",
end_time="20210103",
time_format="yyyyMMdd",
test_parameters__expected_values = [
(
dict(event_timestamp_column=TIMESTAMP_COL),
# Max value by the sliding window '2d'
[5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.],
),
(
dict(use_latest_feature_data=True),
# The latest feature values: Time window is '2d' and thus the feature values for each key is 8.0 and 180.0
[8.0, 8.0, 8.0, 8.0, 8.0, 180.0, 180.0, 180.0, 180.0, 180.0],
),
(
dict(
event_timestamp_column=TIMESTAMP_COL,
observation_time_range=RelativeTimeRange(offset="3d", window="2d"),
),
# TODO BUG - RelativeTimeRange doesn't have any effect on the result
[5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.],
),
(
dict(
event_timestamp_column=TIMESTAMP_COL,
observation_time_range=AbsoluteTimeRange(
start_time=mock_df[TIMESTAMP_COL].max().date().isoformat(),
end_time=mock_df[TIMESTAMP_COL].max().date().isoformat(),
time_format="yyyy-MM-dd",
),
),
# TODO BUG - AbsoluteTimeRange doesn't have any effect on the result
[5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.],
),
# RelativeTimeRange(offset="10h", window="1d"),
# None,
]

# event_timestamp_column_values = [
# "lpep_pickup_datetime",
# None,
# ]

for obs_time_range in observation_time_range_values:
for test_params, expected_values in test_parameters__expected_values:
settings = ObservationSettings(
observation_path=mock_data_path,
event_timestamp_column="lpep_pickup_datetime",# TODOevent_timestamp_column,
timestamp_format="yyyyMMdd", # TODO check -- We only support yyyyMMdd format for this. In future, if there is a request, we can
# use_latest_feature_data=True, #use_latest_feature_data,
observation_time_range=obs_time_range,
timestamp_format="yyyy-MM-dd",
**test_params,
)

output_path = str(Path(Path(mock_data_path).parent, "output.avro"))
Expand All @@ -141,4 +147,6 @@ def test__observation_settings(feathr_client, mock_data_path):

# download result and assert the returned result
res_df = get_result_df(feathr_client)
print(res_df) #[["lpep_pickup_datetime", "f_location_max_fare", "f_trip_distance"]])
res_df.sort_values(by=[KEY_COL, TIMESTAMP_COL], inplace=True)
assert res_df[f"f_{FEATURE_COL}"].tolist() == expected_values
# print(res_df)

0 comments on commit e7be239

Please sign in to comment.