From 6a3068e32adbc5a32344f9efafd8178c226fa3d0 Mon Sep 17 00:00:00 2001 From: snehakumari321 Date: Sat, 4 Mar 2023 18:24:43 -0800 Subject: [PATCH] PEP8 check, EDA markdown --- README.md | 7 + components/get_data/run.py | 1 - components/setup.py | 7 +- components/test_regression_model/run.py | 29 ++-- components/train_val_test_split/run.py | 26 +-- components/wandb_utils/log_artifact.py | 11 +- main.py | 56 ++++--- src/basic_cleaning/MLproject | 4 +- src/basic_cleaning/run.py | 39 ++--- src/data_check/conftest.py | 20 +-- src/data_check/test_data.py | 32 ++-- src/eda/EDA.ipynb | 130 ++++++++++++++- .../feature_engineering.py | 5 +- src/train_random_forest/run.py | 149 +++++++++++------- 14 files changed, 338 insertions(+), 178 deletions(-) diff --git a/README.md b/README.md index f68c8ed55..958c8174e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ + +#### My Submission: +Github repo : https://github.com/snehakumari321/build-ml-pipeline-for-short-term-rental-prices + + +WandB : https://wandb.ai/sneha_kumari/nyc_airbnb + # Build an ML Pipeline for Short-Term Rental Prices in NYC You are working for a property management company renting rooms and properties for short periods of time on various rental platforms. You need to estimate the typical price for a given property based diff --git a/components/get_data/run.py b/components/get_data/run.py index b77d7e537..2d8337e86 100644 --- a/components/get_data/run.py +++ b/components/get_data/run.py @@ -15,7 +15,6 @@ def go(args): - run = wandb.init(job_type="download_file") run.config.update(args) diff --git a/components/setup.py b/components/setup.py index d81ccb960..53d19dfd6 100644 --- a/components/setup.py +++ b/components/setup.py @@ -5,14 +5,11 @@ name="wandb-utils", version=0.1, description="Utilities for interacting with Weights and Biases and mlflow", - zip_safe=False, # avoid eggs, which make the handling of package data cumbersome + zip_safe=False, # avoid eggs,make the handling of package data cumbersome packages=["wandb_utils"], classifiers=[ "Programming Language :: Python :: 3", "Development Status :: 4 - Beta", ], - install_requires=[ - "mlflow", - "wandb" - ] + install_requires=["mlflow", "wandb"], ) diff --git a/components/test_regression_model/run.py b/components/test_regression_model/run.py index f08b598e0..dff8f34e2 100644 --- a/components/test_regression_model/run.py +++ b/components/test_regression_model/run.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """ -This step takes the best model, tagged with the "prod" tag, and tests it against the test dataset +This step takes the best model, tagged with the "prod" tag, +and tests it against the test dataset """ import argparse import logging @@ -17,13 +18,12 @@ def go(args): - run = wandb.init(job_type="test_model") run.config.update(args) logger.info("Downloading artifacts") - # Download input artifact. This will also log that this script is using this - # particular version of the artifact + # Download input artifact. This will also log that this script + # is using this particular version of the artifact model_local_path = run.use_artifact(args.mlflow_model).download() # Download test dataset @@ -46,28 +46,21 @@ def go(args): logger.info(f"MAE: {mae}") # Log MAE and r2 - run.summary['r2'] = r_squared - run.summary['mae'] = mae + run.summary["r2"] = r_squared + run.summary["mae"] = mae if __name__ == "__main__": - - parser = argparse.ArgumentParser(description="Test the provided model against the test dataset") - - parser.add_argument( - "--mlflow_model", - type=str, - help="Input MLFlow model", - required=True + parser = argparse.ArgumentParser( + description="Test the provided model against the test dataset" ) parser.add_argument( - "--test_dataset", - type=str, - help="Test dataset", - required=True + "--mlflow_model", type=str, help="Input MLFlow model", required=True ) + parser.add_argument("--test_dataset", type=str, help="Test dataset", required=True) + args = parser.parse_args() go(args) diff --git a/components/train_val_test_split/run.py b/components/train_val_test_split/run.py index 52709d9fe..0d1678a3e 100644 --- a/components/train_val_test_split/run.py +++ b/components/train_val_test_split/run.py @@ -15,12 +15,11 @@ def go(args): - run = wandb.init(job_type="train_val_test_split") run.config.update(args) - # Download input artifact. This will also note that this script is using this - # particular version of the artifact + # Download input artifact. This will also note that this script + # is using this particular version of the artifact logger.info(f"Fetching artifact {args.input}") artifact_local_path = run.use_artifact(args.input).file() @@ -31,14 +30,13 @@ def go(args): df, test_size=args.test_size, random_state=args.random_seed, - stratify=df[args.stratify_by] if args.stratify_by != 'none' else None, + stratify=df[args.stratify_by] if args.stratify_by != "none" else None, ) # Save to output files - for df, k in zip([trainval, test], ['trainval', 'test']): + for df, k in zip([trainval, test], ["trainval", "test"]): logger.info(f"Uploading {k}_data.csv dataset") with tempfile.NamedTemporaryFile("w") as fp: - df.to_csv(fp.name, index=False) log_artifact( @@ -56,15 +54,25 @@ def go(args): parser.add_argument("input", type=str, help="Input artifact to split") parser.add_argument( - "test_size", type=float, help="Size of the test split. Fraction of the dataset, or number of items" + "test_size", + type=float, + help="Size of the test split. " "Fraction of the dataset, or number of items", ) parser.add_argument( - "--random_seed", type=int, help="Seed for random number generator", default=42, required=False + "--random_seed", + type=int, + help="Seed for random number generator", + default=42, + required=False, ) parser.add_argument( - "--stratify_by", type=str, help="Column to use for stratification", default='none', required=False + "--stratify_by", + type=str, + help="Column to use for stratification", + default="none", + required=False, ) args = parser.parse_args() diff --git a/components/wandb_utils/log_artifact.py b/components/wandb_utils/log_artifact.py index c242c6072..46e505a08 100644 --- a/components/wandb_utils/log_artifact.py +++ b/components/wandb_utils/log_artifact.py @@ -2,13 +2,16 @@ import mlflow -def log_artifact(artifact_name, artifact_type, artifact_description, filename, wandb_run): +def log_artifact( + artifact_name, artifact_type, artifact_description, filename, wandb_run +): """ - Log the provided filename as an artifact in W&B, and add the artifact path to the MLFlow run - so it can be retrieved by subsequent steps in a pipeline + Log the provided filename as an artifact in W&B, and add the artifact path + to the MLFlow run so it can be retrieved by subsequent steps in a pipeline :param artifact_name: name for the artifact - :param artifact_type: type for the artifact (just a string like "raw_data", "clean_data" and so on) + :param artifact_type: type for the artifact + (just a string like "raw_data", "clean_data" and so on) :param artifact_description: a brief description of the artifact :param filename: local filename for the artifact :param wandb_run: current Weights & Biases run diff --git a/main.py b/main.py index 18936cadc..92a8f9d28 100644 --- a/main.py +++ b/main.py @@ -14,38 +14,37 @@ "data_split", "train_random_forest", # NOTE: We do not include this in the steps so it is not run by mistake. - # You first need to promote a model export to "prod" before you can run this, + # You first need to promote a model export to "prod" + # before you can run this, # then you need to run this step explicitly -# "test_regression_model" + # "test_regression_model" ] # This automatically reads in the configuration -@hydra.main(config_name='config') +@hydra.main(config_name="config") def go(config: DictConfig): - # Setup the wandb experiment. All runs will be grouped under this name os.environ["WANDB_PROJECT"] = config["main"]["project_name"] os.environ["WANDB_RUN_GROUP"] = config["main"]["experiment_name"] # Steps to execute - steps_par = config['main']['steps'] + steps_par = config["main"]["steps"] active_steps = steps_par.split(",") if steps_par != "all" else _steps # Move to a temporary directory with tempfile.TemporaryDirectory() as tmp_dir: - if "download" in active_steps: # Download file and load in W&B _ = mlflow.run( f"{config['main']['components_repository']}/get_data", "main", - version='main', + version="main", parameters={ "sample": config["etl"]["sample"], "artifact_name": "sample.csv", "artifact_type": "raw_data", - "artifact_description": "Raw file as downloaded" + "artifact_description": "Raw file as downloaded", }, ) @@ -58,8 +57,8 @@ def go(config: DictConfig): "output_artifact": "clean_sample.csv", "output_type": "clean_sample", "output_description": "Data with outliers and null values removed", - "min_price": config['etl']['min_price'], - "max_price": config['etl']['max_price'] + "min_price": config["etl"]["min_price"], + "max_price": config["etl"]["max_price"], }, ) @@ -72,56 +71,61 @@ def go(config: DictConfig): "csv": "clean_sample.csv:latest", "ref": "clean_sample.csv:reference", "kl_threshold": config["data_check"]["kl_threshold"], - "min_price": config['etl']['min_price'], - "max_price": config['etl']['max_price'] + "min_price": config["etl"]["min_price"], + "max_price": config["etl"]["max_price"], }, ) if "data_split" in active_steps: _ = mlflow.run( - f"{config['main']['components_repository']}/train_val_test_split", + f"{config['main']['components_repository']}/" f"train_val_test_split", "main", - version='main', + version="main", parameters={ "input": "clean_sample.csv:latest", "test_size": config["modeling"]["test_size"], "random_seed": config["modeling"]["random_seed"], - "stratify": config['modeling']['stratify_by'] + "stratify": config["modeling"]["stratify_by"], }, ) if "train_random_forest" in active_steps: - - # NOTE: we need to serialize the random forest configuration into JSON + # NOTE: we need to serialize the random forest + # configuration into JSON rf_config = os.path.abspath("rf_config.json") with open(rf_config, "w+") as fp: - json.dump(dict(config["modeling"]["random_forest"].items()), fp) # DO NOT TOUCH + json.dump( + dict(config["modeling"]["random_forest"].items()), fp + ) # DO NOT TOUCH - # NOTE: use the rf_config we just created as the rf_config parameter for the train_random_forest + # NOTE: use the rf_config we just created as the + # rf_config parameter for the train_random_forest # step _ = mlflow.run( - os.path.join(hydra.utils.get_original_cwd(), "src", "train_random_forest"), + os.path.join( + hydra.utils.get_original_cwd(), "src", "train_random_forest" + ), "main", parameters={ "trainval_artifact": "clean_sample.csv:latest", "val_size": config["modeling"]["test_size"], "random_seed": config["modeling"]["random_seed"], - "stratify": config['modeling']['stratify_by'], + "stratify": config["modeling"]["stratify_by"], "rf_config": rf_config, - "max_tfidf_features": config['modeling']['max_tfidf_features'], - "output_artifact": "random_forest_export" + "max_tfidf_features": config["modeling"]["max_tfidf_features"], + "output_artifact": "random_forest_export", }, ) if "test_regression_model" in active_steps: _ = mlflow.run( - f"{config['main']['components_repository']}/test_regression_model", + f"{config['main']['components_repository']}/" f"test_regression_model", "main", - version='main', + version="main", parameters={ "mlflow_model": "random_forest_export:prod", - "test_dataset": "test_data.csv:latest" + "test_dataset": "test_data.csv:latest", }, ) diff --git a/src/basic_cleaning/MLproject b/src/basic_cleaning/MLproject index ff2542580..1da2ecc08 100644 --- a/src/basic_cleaning/MLproject +++ b/src/basic_cleaning/MLproject @@ -23,11 +23,11 @@ entry_points: min_price: description: parameter for minimum price - type: string + type: float max_price: description: parameter for maximum price - type: string + type: float command: >- diff --git a/src/basic_cleaning/run.py b/src/basic_cleaning/run.py index daeddf32f..47a25dc30 100644 --- a/src/basic_cleaning/run.py +++ b/src/basic_cleaning/run.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """ -Download from W&B the raw dataset and apply some basic data cleaning, exporting the result to a new artifact +Download from W&B the raw dataset and apply some basic data cleaning, +exporting the result to a new artifact """ import argparse import logging @@ -13,12 +14,11 @@ def go(args): - run = wandb.init(job_type="basic_cleaning") run.config.update(args) - # Download input artifact. This will also log that this script is using this - # particular version of the artifact + # Download input artifact. This will also log that this script + # is using this particular version of the artifact # artifact_local_path = run.use_artifact(args.input_artifact).file() run = wandb.init(project="nyc_airbnb", group="eda", save_code=True) @@ -26,14 +26,14 @@ def go(args): df = pd.read_csv(artifact_local_path) # Drop ouliers from price column - idx = df['price'].between(args.min_price, args.max_price) + idx = df["price"].between(args.min_price, args.max_price) df = df[idx].copy() # Convert last_review to datetime - df['last_review'] = pd.to_datetime(df['last_review']) + df["last_review"] = pd.to_datetime(df["last_review"]) # Filter proper boundary - idx = df['longitude'].between(-74.25, -73.50) & df['latitude'].between(40.5, 41.2) + idx = df["longitude"].between(-74.25, -73.50) & df["latitude"].between(40.5, 41.2) df = df[idx].copy() df.to_csv(args.output_artifact, index=False) @@ -52,52 +52,39 @@ def go(args): if __name__ == "__main__": - parser = argparse.ArgumentParser(description="A very basic data cleaning") parser.add_argument( "--input_artifact", type=str, help="name of input artifact for this pipeline step ", - required=True + required=True, ) parser.add_argument( "--output_artifact", type=str, help="name of the output artifact for this pipeline step", - required=True + required=True, ) - parser.add_argument( - "--output_type", - type=str, - help="type of output", - required=True - ) + parser.add_argument("--output_type", type=str, help="type of output", required=True) parser.add_argument( "--output_description", type=str, help="description of output artifact", - required=True + required=True, ) parser.add_argument( - "--min_price", - type=float, - help="parameter for minimum price", - required=True + "--min_price", type=float, help="parameter for minimum price", required=True ) parser.add_argument( - "--max_price", - type=float, - help="parameter for maximum price", - required=True + "--max_price", type=float, help="parameter for maximum price", required=True ) - args = parser.parse_args() go(args) diff --git a/src/data_check/conftest.py b/src/data_check/conftest.py index b000a45da..ac1824567 100644 --- a/src/data_check/conftest.py +++ b/src/data_check/conftest.py @@ -11,12 +11,12 @@ def pytest_addoption(parser): parser.addoption("--max_price", action="store") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def data(request): run = wandb.init(job_type="data_tests", resume=True) - # Download input artifact. This will also note that this script is using this - # particular version of the artifact + # Download input artifact. This will also note that this script + # is using this particular version of the artifact data_path = run.use_artifact(request.config.option.csv).file() if data_path is None: @@ -27,12 +27,12 @@ def data(request): return df -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def ref_data(request): run = wandb.init(job_type="data_tests", resume=True) - # Download input artifact. This will also note that this script is using this - # particular version of the artifact + # Download input artifact. This will also note that this script + # is using this particular version of the artifact data_path = run.use_artifact(request.config.option.ref).file() if data_path is None: @@ -43,7 +43,7 @@ def ref_data(request): return df -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def kl_threshold(request): kl_threshold = request.config.option.kl_threshold @@ -52,7 +52,8 @@ def kl_threshold(request): return float(kl_threshold) -@pytest.fixture(scope='session') + +@pytest.fixture(scope="session") def min_price(request): min_price = request.config.option.min_price @@ -61,7 +62,8 @@ def min_price(request): return float(min_price) -@pytest.fixture(scope='session') + +@pytest.fixture(scope="session") def max_price(request): max_price = request.config.option.max_price diff --git a/src/data_check/test_data.py b/src/data_check/test_data.py index 8be572fde..21fcdaac2 100644 --- a/src/data_check/test_data.py +++ b/src/data_check/test_data.py @@ -4,7 +4,6 @@ def test_column_names(data): - expected_colums = [ "id", "name", @@ -31,10 +30,9 @@ def test_column_names(data): def test_neighborhood_names(data): - known_names = ["Bronx", "Brooklyn", "Manhattan", "Queens", "Staten Island"] - neigh = set(data['neighbourhood_group'].unique()) + neigh = set(data["neighbourhood_group"].unique()) # Unordered check assert set(known_names) == set(neigh) @@ -42,20 +40,26 @@ def test_neighborhood_names(data): def test_proper_boundaries(data: pd.DataFrame): """ - Test proper longitude and latitude boundaries for properties in and around NYC + Test proper longitude and latitude boundaries for properties + in and around NYC """ - idx = data['longitude'].between(-74.25, -73.50) & data['latitude'].between(40.5, 41.2) + idx = data["longitude"].between(-74.25, -73.50) & data["latitude"].between( + 40.5, 41.2 + ) assert np.sum(~idx) == 0 -def test_similar_neigh_distrib(data: pd.DataFrame, ref_data: pd.DataFrame, kl_threshold: float): +def test_similar_neigh_distrib( + data: pd.DataFrame, ref_data: pd.DataFrame, kl_threshold: float +): """ - Apply a threshold on the KL divergence to detect if the distribution of the new data is - significantly different than that of the reference dataset + Apply a threshold on the KL divergence to detect if the distribution + of the new data is significantly different than that of the reference + dataset """ - dist1 = data['neighbourhood_group'].value_counts().sort_index() - dist2 = ref_data['neighbourhood_group'].value_counts().sort_index() + dist1 = data["neighbourhood_group"].value_counts().sort_index() + dist2 = ref_data["neighbourhood_group"].value_counts().sort_index() assert scipy.stats.entropy(dist1, dist2, base=2) < kl_threshold @@ -67,9 +71,13 @@ def test_row_count(data): """ assert 15000 < data.shape[0] < 1000000 + def test_price_range(data, min_price, max_price): """ - Test if the price feature has values within the min_price and max_price range + Test if the price feature has values within the min_price + and max_price range """ - assert all(data['price'].between(min_price, max_price)) + assert all(data["price"].between(min_price, max_price)) + + ######################################################## diff --git a/src/eda/EDA.ipynb b/src/eda/EDA.ipynb index 9f659df56..f021c64b5 100644 --- a/src/eda/EDA.ipynb +++ b/src/eda/EDA.ipynb @@ -1,9 +1,43 @@ { "cells": [ + { + "cell_type": "markdown", + "id": "478eb885", + "metadata": {}, + "source": [ + "# Exploratory Data Analysis for predicting Short-term Rental Prices in NYC" + ] + }, + { + "cell_type": "markdown", + "id": "37143504", + "metadata": {}, + "source": [ + "### 1) Problem:\n", + "\n", + "#### Assumption:\n", + "We are working for a property management company renting rooms and properties for short periods of time on various platforms. \n", + "#### Task:\n", + "We need to estimate the typical price for a given property based on the price of similar properties.\n", + "\n", + "Here as the first step we explore the data provided to us to analysis the components in our data, the distribution of different features, the correlation between features and also check for any abnormalities and fix them before using it for model development." + ] + }, + { + "cell_type": "markdown", + "id": "f7f54d90", + "metadata": {}, + "source": [ + "### 2) Import relevant library:\n", + "- wandb: It is a dashboard to keep track of your experiments and artifacts so you can compare models and keep track of the findings.\n", + "- pandas: python library for data manipulation\n", + "- pandas_profiling: Python library that performs an automated Exploratory Data Analysis and provides us with a detailed report." + ] + }, { "cell_type": "code", - "execution_count": 1, - "id": "522c165d", + "execution_count": null, + "id": "7a897eac", "metadata": {}, "outputs": [], "source": [ @@ -11,6 +45,18 @@ "import pandas as pd" ] }, + { + "cell_type": "markdown", + "id": "6ba8977f", + "metadata": {}, + "source": [ + "### 3) Import Data\n", + "We connect our project to weights and baises(W&B) and data is retrieved from the artifact section.\n", + "- Here we first initialize the project and use save_code as True to make sure that the notebook is uploaded and versioned by W&B \n", + "- We download the artifact file and get its local path\n", + "- We use pandas to read the file from the local path" + ] + }, { "cell_type": "code", "execution_count": 2, @@ -80,6 +126,14 @@ "df = pd.read_csv(local_path)" ] }, + { + "cell_type": "markdown", + "id": "f6ff9f68", + "metadata": {}, + "source": [ + "Lets take a look at the data" + ] + }, { "cell_type": "code", "execution_count": 3, @@ -192,9 +246,21 @@ } ], "source": [ + "# Check the first 2 rows\n", "df.head(2)" ] }, + { + "cell_type": "markdown", + "id": "8ec07c15", + "metadata": {}, + "source": [ + "### 4) Automatic EDA using Pandas Profiling\n", + "We use pandas profiling to generate an automatic preliminary analysis report for the data.\n", + "- We first create a profile report for the input dataframe\n", + "- We display the report" + ] + }, { "cell_type": "code", "execution_count": 4, @@ -272,6 +338,14 @@ "profile.to_widgets()" ] }, + { + "cell_type": "markdown", + "id": "d8d3c3db", + "metadata": {}, + "source": [ + "- We convert the report to an html format to view, analyse and share it easily." + ] + }, { "cell_type": "code", "execution_count": 9, @@ -302,10 +376,30 @@ "id": "43b2c120", "metadata": {}, "source": [ - "#### Analysis:\n", - "There are no duplicate rows.\n", - "There are 2.6% missing values with last_review having a lot of missing values.\n", - "Price has a very skewed plot suggesting effect of outliers." + "### 5) Analysis:\n", + "- There are no duplicate rows.\n", + "- There are 2.6% missing values with last_review having a lot of missing values.\n", + "- Price and num_reviews features have a very skewed plot suggesting effect of outliers.\n", + "- last_review feature has a string datatype but should be of date format.\n", + "\n", + "- Correlation: number_of reviews, reviews_per_months, calculated_cost_listing_count features have a neagtive correlation coefficient values. This suggests that a highly priced property has low reviews overall as well as per month and a low calculated_cost_listing_count." + ] + }, + { + "cell_type": "markdown", + "id": "cad193c2", + "metadata": {}, + "source": [ + "### 6) Data Cleanup\n", + "We try to address few of the problems listed above." + ] + }, + { + "cell_type": "markdown", + "id": "a5e66d41", + "metadata": {}, + "source": [ + "- We drop the outliers for the price feature and bound it within a reasonable range for our analysis" ] }, { @@ -322,6 +416,14 @@ "df =df[idx].copy()" ] }, + { + "cell_type": "markdown", + "id": "de59e654", + "metadata": {}, + "source": [ + "- We convert the last_review feature to its appropriate datatype of datetime" + ] + }, { "cell_type": "code", "execution_count": 6, @@ -333,6 +435,14 @@ "df['last_review'] = pd.to_datetime(df['last_review'])" ] }, + { + "cell_type": "markdown", + "id": "f9daa5aa", + "metadata": {}, + "source": [ + "We verify if the changes are reflected" + ] + }, { "cell_type": "code", "execution_count": 7, @@ -373,6 +483,14 @@ "df.info()" ] }, + { + "cell_type": "markdown", + "id": "897a1d06", + "metadata": {}, + "source": [ + "### 7) End the W&B run." + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/src/train_random_forest/feature_engineering.py b/src/train_random_forest/feature_engineering.py index d4a1c7369..58b3bc42e 100644 --- a/src/train_random_forest/feature_engineering.py +++ b/src/train_random_forest/feature_engineering.py @@ -4,8 +4,9 @@ def delta_date_feature(dates): """ - Given a 2d array containing dates (in any format recognized by pd.to_datetime), it returns the delta in days + Given a 2d array containing dates (in any format recognized + by pd.to_datetime), it returns the delta in days between each date and the most recent date in its column """ date_sanitized = pd.DataFrame(dates).apply(pd.to_datetime) - return date_sanitized.apply(lambda d: (d.max() -d).dt.days, axis=0).to_numpy() + return date_sanitized.apply(lambda d: (d.max() - d).dt.days, axis=0).to_numpy() diff --git a/src/train_random_forest/run.py b/src/train_random_forest/run.py index 6b5c4713a..d79c0cd64 100644 --- a/src/train_random_forest/run.py +++ b/src/train_random_forest/run.py @@ -27,11 +27,12 @@ def delta_date_feature(dates): """ - Given a 2d array containing dates (in any format recognized by pd.to_datetime), it returns the delta in days + Given a 2d array containing dates (in any format recognized + by pd.to_datetime), it returns the delta in days between each date and the most recent date in its column """ date_sanitized = pd.DataFrame(dates).apply(pd.to_datetime) - return date_sanitized.apply(lambda d: (d.max() -d).dt.days, axis=0).to_numpy() + return date_sanitized.apply(lambda d: (d.max() - d).dt.days, axis=0).to_numpy() logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s") @@ -39,7 +40,6 @@ def delta_date_feature(dates): def go(args): - run = wandb.init(job_type="train_random_forest") run.config.update(args) @@ -49,32 +49,41 @@ def go(args): run.config.update(rf_config) # Fix the random seed for the Random Forest, so we get reproducible results - rf_config['random_state'] = args.random_seed + rf_config["random_state"] = args.random_seed ###################################### - # Use run.use_artifact(...).file() to get the train and validation artifact (args.trainval_artifact) - # and save the returned path in train_local_pat + # Use run.use_artifact(...).file() to get the train and validation + # artifact (args.trainval_artifact) and save the returned path in + # train_local_pat trainval_local_path = run.use_artifact(args.trainval_artifact).file() ###################################### X = pd.read_csv(trainval_local_path) - y = X.pop("price") # this removes the column "price" from X and puts it into y + y = X.pop("price") + # this removes the column "price" from X and puts it into y logger.info(f"Minimum price: {y.min()}, Maximum price: {y.max()}") X_train, X_val, y_train, y_val = train_test_split( - X, y, test_size=args.val_size, stratify=X[args.stratify_by], random_state=args.random_seed + X, + y, + test_size=args.val_size, + stratify=X[args.stratify_by], + random_state=args.random_seed, ) logger.info("Preparing sklearn pipeline") - sk_pipe, processed_features = get_inference_pipeline(rf_config, args.max_tfidf_features) + sk_pipe, processed_features = get_inference_pipeline( + rf_config, args.max_tfidf_features + ) # Then fit it to the X_train, y_train data logger.info("Fitting") ###################################### - # Fit the pipeline sk_pipe by calling the .fit method on X_train and y_train + # Fit the pipeline sk_pipe by calling the + # .fit method on X_train and y_train sk_pipe.fit(X_train, y_train) ###################################### @@ -95,23 +104,35 @@ def go(args): shutil.rmtree("random_forest_dir") ###################################### - # Save the sk_pipe pipeline as a mlflow.sklearn model in the directory "random_forest_dir" + # Save the sk_pipe pipeline as a mlflow.sklearn model in the + # directory "random_forest_dir" # HINT: use mlflow.sklearn.save_model signature = mlflow.models.infer_signature(X_val[processed_features], y_pred) - export_path = "random_forest_dir" - mlflow.sklearn.save_model(sk_pipe, export_path, serialization_format = mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE, - signature=signature, input_example= X_val.iloc[:2]) + mlflow.sklearn.save_model( + sk_pipe, + export_path, + serialization_format=mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE, + signature=signature, + input_example=X_val.iloc[:2], + ) ###################################### ###################################### # Upload the model we just exported to W&B - # HINT: use wandb.Artifact to create an artifact. Use args.output_artifact as artifact name, "model_export" as - # type, provide a description and add rf_config as metadata. Then, use the .add_dir method of the artifact instance - # you just created to add the "random_forest_dir" directory to the artifact, and finally use - # run.log_artifact to log the artifact to the run - artifact = wandb.Artifact(args.output_artifact, type="model_export", description= "Random forest pipeline export", metadata=rf_config) + # HINT: use wandb.Artifact to create an artifact. Use args.output_artifact + # as artifact name, "model_export" as type, provide a description and + # add rf_config as metadata. Then, use the .add_dir method of the + # artifact instance you just created to add the "random_forest_dir" + # directory to the artifact,and finally use run.log_artifact + # to log the artifact to the run + artifact = wandb.Artifact( + args.output_artifact, + type="model_export", + description="Random forest pipeline export", + metadata=rf_config, + ) artifact.add_dir(export_path) run.log_artifact(artifact) artifact.wait() @@ -122,25 +143,27 @@ def go(args): ###################################### # Here we save r_squared under the "r2" key - run.summary['r2'] = r_squared + run.summary["r2"] = r_squared # Now log the variable "mae" under the key "mae". - run.summary['mae'] = mae + run.summary["mae"] = mae ###################################### # Upload to W&B the feture importance visualization run.log( { - "feature_importance": wandb.Image(fig_feat_imp), + "feature_importance": wandb.Image(fig_feat_imp), } ) def plot_feature_importance(pipe, feat_names): # We collect the feature importance for all non-nlp features first - feat_imp = pipe["random_forest"].feature_importances_[: len(feat_names)-1] + feat_imp = pipe["random_forest"].feature_importances_[: len(feat_names) - 1] # For the NLP feature we sum across all the TF-IDF dimensions into a global # NLP importance - nlp_importance = sum(pipe["random_forest"].feature_importances_[len(feat_names) - 1:]) + nlp_importance = sum( + pipe["random_forest"].feature_importances_[len(feat_names) - 1 :] + ) feat_imp = np.append(feat_imp, nlp_importance) fig_feat_imp, sub_feat_imp = plt.subplots(figsize=(10, 10)) # idx = np.argsort(feat_imp)[::-1] @@ -153,13 +176,15 @@ def plot_feature_importance(pipe, feat_names): def get_inference_pipeline(rf_config, max_tfidf_features): # Let's handle the categorical features first - # Ordinal categorical are categorical values for which the order is meaningful, for example - # for room type: 'Entire home/apt' > 'Private room' > 'Shared room' + # Ordinal categorical are categorical values for which the order is + # meaningful, for example for room type: + # 'Entire home/apt' > 'Private room' > 'Shared room' ordinal_categorical = ["room_type"] non_ordinal_categorical = ["neighbourhood_group"] # NOTE: we do not need to impute room_type because the type of the room - # is mandatory on the websites, so missing values are not possible in production - # (nor during training). That is not true for neighbourhood_group + # is mandatory on the websites, so missing values are not possible + # in production (nor during training). That is not true for + # neighbourhood_group ordinal_categorical_preproc = OrdinalEncoder() ###################################### @@ -167,12 +192,12 @@ def get_inference_pipeline(rf_config, max_tfidf_features): # 1 - A SimpleImputer(strategy="most_frequent") to impute missing values # 2 - A OneHotEncoder() step to encode the variable non_ordinal_categorical_preproc = make_pipeline( - SimpleImputer(strategy="most_frequent"), - OneHotEncoder() + SimpleImputer(strategy="most_frequent"), OneHotEncoder() ) ###################################### - # Let's impute the numerical columns to make sure we can handle missing values + # Let's impute the numerical columns to make sure we can handle missing + # values # (note that we do not scale because the RF algorithm does not need that) zero_imputed = [ "minimum_nights", @@ -181,17 +206,18 @@ def get_inference_pipeline(rf_config, max_tfidf_features): "calculated_host_listings_count", "availability_365", "longitude", - "latitude" + "latitude", ] zero_imputer = SimpleImputer(strategy="constant", fill_value=0) # A MINIMAL FEATURE ENGINEERING step: - # we create a feature that represents the number of days passed since the last review - # First we impute the missing review date with an old date (because there hasn't been - # a review for a long time), and then we create a new feature from it, + # we create a feature that represents the number of days passed since + # the last review. First we impute the missing review date with an old + # date (because there hasn't been a review for a long time), and then we + # create a new feature from it, date_imputer = make_pipeline( - SimpleImputer(strategy='constant', fill_value='2010-01-01'), - FunctionTransformer(delta_date_feature, check_inverse=False, validate=False) + SimpleImputer(strategy="constant", fill_value="2010-01-01"), + FunctionTransformer(delta_date_feature, check_inverse=False, validate=False), ) # Some minimal NLP for the "name" column @@ -200,9 +226,7 @@ def get_inference_pipeline(rf_config, max_tfidf_features): SimpleImputer(strategy="constant", fill_value=""), reshape_to_1d, TfidfVectorizer( - binary=False, - max_features=max_tfidf_features, - stop_words='english' + binary=False, max_features=max_tfidf_features, stop_words="english" ), ) @@ -210,49 +234,58 @@ def get_inference_pipeline(rf_config, max_tfidf_features): preprocessor = ColumnTransformer( transformers=[ ("ordinal_cat", ordinal_categorical_preproc, ordinal_categorical), - ("non_ordinal_cat", non_ordinal_categorical_preproc, non_ordinal_categorical), + ( + "non_ordinal_cat", + non_ordinal_categorical_preproc, + non_ordinal_categorical, + ), ("impute_zero", zero_imputer, zero_imputed), ("transform_date", date_imputer, ["last_review"]), - ("transform_name", name_tfidf, ["name"]) + ("transform_name", name_tfidf, ["name"]), ], remainder="drop", # This drops the columns that we do not transform ) - processed_features = ordinal_categorical + non_ordinal_categorical + zero_imputed + ["last_review", "name"] + processed_features = ( + ordinal_categorical + + non_ordinal_categorical + + zero_imputed + + ["last_review", "name"] + ) # Create random forest random_Forest = RandomForestRegressor(**rf_config) ###################################### - # Create the inference pipeline. The pipeline must have 2 steps: a step called "preprocessor" applying the - # ColumnTransformer instance that we saved in the `preprocessor` variable, and a step called "random_forest" - # with the random forest instance that we just saved in the `random_forest` variable. - # HINT: Use the explicit Pipeline constructor so you can assign the names to the steps, do not use make_pipeline + # Create the inference pipeline. The pipeline must have 2 steps: + # a step called "preprocessor" applying the ColumnTransformer instance that + # we saved in the `preprocessor` variable, and a step called + # "random_forest" with the random forest instance that we just saved + # in the `random_forest` variable. + # HINT: Use the explicit Pipeline constructor so you can assign + # the names to the steps, do not use make_pipeline sk_pipe = Pipeline( - steps=[ - ("preprocessor", preprocessor), - ("random_forest", random_Forest) - - ], + steps=[("preprocessor", preprocessor), ("random_forest", random_Forest)], ) return sk_pipe, processed_features if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Basic cleaning of dataset") parser.add_argument( "--trainval_artifact", type=str, - help="Artifact containing the training dataset. It will be split into train and validation" + help="Artifact containing the training dataset. " + "It will be split into train and validation", ) parser.add_argument( "--val_size", type=float, - help="Size of the validation split. Fraction of the dataset, or number of items", + help="Size of the validation split. Fraction of the dataset," + " or number of items", ) parser.add_argument( @@ -273,8 +306,8 @@ def get_inference_pipeline(rf_config, max_tfidf_features): parser.add_argument( "--rf_config", - help="Random forest configuration. A JSON dict that will be passed to the " - "scikit-learn constructor for RandomForestRegressor.", + help="Random forest configuration. A JSON dict that will be passed" + " to the scikit-learn constructor for RandomForestRegressor.", default="{}", ) @@ -282,7 +315,7 @@ def get_inference_pipeline(rf_config, max_tfidf_features): "--max_tfidf_features", help="Maximum number of words to consider for the TFIDF", default=10, - type=int + type=int, ) parser.add_argument(