diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..d4d777c0 --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +max-line-length = 120 +ignore = E203, E266, E501, W503 +exclude = .venv, .git, __pycache__, build, dist diff --git a/.github/ISSUE_TEMPLATE/bug.md b/.github/ISSUE_TEMPLATE/bug.md new file mode 100644 index 00000000..4a080196 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug.md @@ -0,0 +1,37 @@ +--- +name: Bug +about: Create a bug report +title: '' +labels: bug +assignees: + +--- + +## 🐛 Bug + + + +### To Reproduce + +Steps to reproduce the behavior: + +1. Go to '...' +2. Run '....' +3. ... + + + +### Expected behavior + + + +### Additional context + + + +### Checklist + +- [ ] bug description +- [ ] steps to reproduce +- [ ] expected behavior +- [ ] code sample / screenshots diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 00000000..e1d38447 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,34 @@ +--- +name: Feature request +about: Suggest a feature to implement +title: '' +labels: enhancement +assignees: + +--- + +## 🚀 Feature Request + + + +### Motivation + + + +### Proposal + + + +### Alternatives + + + +### Additional context + + + +### Checklist + +- [ ] feature proposal description +- [ ] motivation +- [ ] additional context / proposal alternatives review diff --git a/.github/ISSUE_TEMPLATE/question.md b/.github/ISSUE_TEMPLATE/question.md new file mode 100644 index 00000000..833ed552 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/question.md @@ -0,0 +1,26 @@ +--- +name: How to question +about: Asking how-to questions +title: '' +labels: help wanted, question +assignees: +--- + +## ❓ Questions and Help + +### Before asking: + +1. search the issues. +2. search the docs. + +#### What is your question? + +#### Code + + + +#### What have you tried? + +### Additional context + + diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..b8f7af4e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,37 @@ +name: Continuous Integration + +on: + push: + branches: [ master, dev/master ] + paths-ignore: + - "docs/**" + - "*.md" + pull_request: + branches: [ master, dev/master ] + paths-ignore: + - "docs/**" + - "*.md" + +jobs: + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + python-version: [ 3.8, 3.9, 3.10.10 ] + os: [ ubuntu-latest, macos-latest, windows-latest ] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install poetry + poetry install + - name: Run pre-commit checks + run: poetry run pre-commit run --all-files + - name: Run tests + run: poetry run pytest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..74495fe5 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,46 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-yaml + - id: debug-statements + - id: end-of-file-fixer + - id: trailing-whitespace + + - repo: https://github.com/psf/black + rev: 23.11.0 + hooks: + - id: black + language_version: python3.10 + + - repo: https://github.com/PyCQA/flake8 + rev: 6.1.0 + hooks: + - id: flake8 + language_version: python3.10 + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.7.0 + hooks: + - id: mypy + language_version: python3.10 + + - repo: https://github.com/terrencepreilly/darglint + rev: v1.8.1 + hooks: + - id: darglint + language_version: python3.10 + + + - repo: https://github.com/myint/rstcheck + rev: v6.2.0 + hooks: + - id: rstcheck + language_version: python3.10 + exclude: ^docs/_templates/ + + - repo: https://github.com/pre-commit/mirrors-isort + rev: v5.10.1 + hooks: + - id: isort + language_version: python3.10 diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 371c5753..671f36e8 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -24,4 +24,4 @@ formats: all python: install: - requirements: docs/requirements.txt - - path: . \ No newline at end of file + - path: . diff --git a/docs/conf.py b/docs/conf.py index c7886289..d27a2372 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -15,6 +15,7 @@ import os import sys + CURR_PATH = os.path.abspath(os.path.dirname(__file__)) LIB_PATH = os.path.join(CURR_PATH, os.path.pardir) sys.path.insert(0, LIB_PATH) @@ -37,7 +38,12 @@ "IPython.sphinxext.ipython_console_highlighting", ] -exclude_patterns = ["_build/*", "**.ipynb_checkpoints"] +exclude_patterns = [ + "_build/*", + "**.ipynb_checkpoints", + "Thumbs.db", + ".DS_Store", +] # Delete external references autosummary_mock_imports = [ @@ -62,11 +68,6 @@ # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] -# List of patterns, relative to source directory, that match files and -# directories to ignore when looking for source files. -# This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] - # -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for diff --git a/docs/index.rst b/docs/index.rst deleted file mode 100644 index bb6f2460..00000000 --- a/docs/index.rst +++ /dev/null @@ -1,31 +0,0 @@ -HypEx documentation -========================= - -`HypEx `_ is open-source Python library aimed at automated Causal Infrence. -It is designed to be lightweight and efficient for various tasks with Matching, A/B-testing -HypEx provides easy-to-use pipeline creation, that enables: - -- Automatic data processing -- Automatic typing, feature selection. -- Automatic Matching pipeline -- Automatic A\B testing pipeline -- Automatic report creation. -- Automatic validation of results -- Easy-to-use modular scheme to create your own pipelines - - - -.. toctree:: - :maxdepth: 1 - :caption: Contents - - Installation Guide - Tutorials - Python-API - - - -Indices and Tables -================== - -* :ref:`genindex` diff --git a/docs/pages/Installation.rst b/docs/pages/Installation.rst deleted file mode 100644 index e68dcd7f..00000000 --- a/docs/pages/Installation.rst +++ /dev/null @@ -1,28 +0,0 @@ -Installation Guide -================== - - -Basic ------ - -You can install library `HypEx` from PyPI. - -.. code-block:: bash - - pip install hypex - - -Development ------------ - -You can also clone repository and install with poetry. -First, install `poetry `_. -Then, - -.. code-block:: bash - - git clone git@github.com:sb-ai-lab/hypex.git - - cd hypex - - poetry install diff --git a/docs/pages/Python-API.rst b/docs/pages/Python-API.rst deleted file mode 100644 index 36b5008f..00000000 --- a/docs/pages/Python-API.rst +++ /dev/null @@ -1,13 +0,0 @@ -Python-API -========== - - -.. toctree:: - :maxdepth: 2 - :caption: Main modules - - modules/matcher - modules/ab_test - modules/selectors - modules/utils - modules/algorithms diff --git a/docs/pages/Tutorials.rst b/docs/pages/Tutorials.rst deleted file mode 100644 index b153c2f6..00000000 --- a/docs/pages/Tutorials.rst +++ /dev/null @@ -1,10 +0,0 @@ -Tutorials -========= - - -.. toctree:: - :maxdepth: 1 - :caption: Contents - - tutorials/Tutorial_1_Matching.nblink - tutorials/Tutorial_2_ABtesting.nblink diff --git a/docs/pages/modules/ab_test.rst b/docs/pages/modules/ab_test.rst deleted file mode 100644 index e3c582e7..00000000 --- a/docs/pages/modules/ab_test.rst +++ /dev/null @@ -1,23 +0,0 @@ -.. currentmodule:: hypex.ab_test.ab_tester - -AB Testing -=================== - -Classes for AB testing analysis. - -AB Test Classes ------------------------- - -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: classtemplate.rst - - AATest - ABTest - - -Utility Functions ------------------ - -.. autofunction:: merge_groups \ No newline at end of file diff --git a/docs/pages/modules/algorithms.rst b/docs/pages/modules/algorithms.rst deleted file mode 100644 index 36051eaa..00000000 --- a/docs/pages/modules/algorithms.rst +++ /dev/null @@ -1,52 +0,0 @@ -.. currentmodule:: hypex.algorithms - -Algorithms -=================== - -Classes for search Nearest Neighbors in a quick way - -Matching classes ------------------------- - -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: classtemplate.rst - - FaissMatcher - MatcherNoReplacement - -Utility Functions ------------------------- -.. currentmodule:: hypex.algorithms.faiss_matcher -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - map_func - f2 - _get_index - _transform_to_np - calc_atx_var - calc_atc_se - conditional_covariance - calc_att_se - calc_ate_se - pval_calc - scaled_counts - bias_coefs - bias - f3 - -.. currentmodule:: hypex.algorithms.no_replacement_matching -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - optimally_match_distance_matrix - _ensure_array_columnlike - - - diff --git a/docs/pages/modules/matcher.rst b/docs/pages/modules/matcher.rst deleted file mode 100644 index 745187e2..00000000 --- a/docs/pages/modules/matcher.rst +++ /dev/null @@ -1,16 +0,0 @@ -.. currentmodule:: hypex.matcher - -Matcher -=================== - -Class for calculate Matching via nearest neighbors - -Matching classes ----------------------------- - -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: classtemplate.rst - - Matcher diff --git a/docs/pages/modules/selectors.rst b/docs/pages/modules/selectors.rst deleted file mode 100644 index 38148a79..00000000 --- a/docs/pages/modules/selectors.rst +++ /dev/null @@ -1,27 +0,0 @@ -.. currentmodule:: hypex.selectors - -Selectors -====================== - -Classes for feature selectors - -Selector classes ------------------------------ - -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: classtemplate.rst - - feature_selector.FeatureSelector - outliers_filter.OutliersFilter - spearman_filter.SpearmanFilter - -.. currentmodule:: hypex.selectors.base_filtration -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - const_filtration - nan_filtration diff --git a/docs/pages/modules/utils.rst b/docs/pages/modules/utils.rst deleted file mode 100644 index a4106107..00000000 --- a/docs/pages/modules/utils.rst +++ /dev/null @@ -1,67 +0,0 @@ -.. currentmodule:: hypex.utils - -Utils -======================= - -Classes for some utils - -Utils classes ------------------------------------ - -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: classtemplate.rst - - psi_pandas.PSI - - -Base metrics for quality results ----------------------------------------- - -.. currentmodule:: hypex.utils.metrics -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - smd - ks - matching_quality - check_repeats - -Functions for calculate PSI ----------------------------------------------- -.. currentmodule:: hypex.utils.psi_pandas -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - report - -Dataset creation for tutorials and testing --------------------------------------------------------- - -.. currentmodule:: hypex.utils.tutorial_data_creation -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - set_nans - create_test_data - -Validators ----------------------------- - -.. currentmodule:: hypex.utils.validators -.. autosummary:: - :toctree: ./generated - :nosignatures: - :template: functiontemplate.rst - - random_treatment - random_feature - subset_refuter - test_significance diff --git a/docs/pages/tutorials/Tutorial_1_Matching.nblink b/docs/pages/tutorials/Tutorial_1_Matching.nblink deleted file mode 100644 index 0222cd93..00000000 --- a/docs/pages/tutorials/Tutorial_1_Matching.nblink +++ /dev/null @@ -1,3 +0,0 @@ -{ - "path": "../../../examples/tutorials/Tutorial_1_Matching.ipynb" -} diff --git a/docs/pages/tutorials/Tutorial_2_ABtesting.nblink b/docs/pages/tutorials/Tutorial_2_ABtesting.nblink deleted file mode 100644 index fe6ea010..00000000 --- a/docs/pages/tutorials/Tutorial_2_ABtesting.nblink +++ /dev/null @@ -1,3 +0,0 @@ -{ - "path": "../../../examples/tutorials/Tutorial_2_ABtesting.ipynb" -} diff --git a/docs/requirements.txt b/docs/requirements.txt index 7a817df5..74f25e2d 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -5,4 +5,4 @@ sphinx-autodoc-typehints pandoc jupyter prompt-toolkit<3.0.0,!=3.0.1,>=2.0.0 -sphinx_rtd_theme \ No newline at end of file +sphinx_rtd_theme diff --git a/hypex/__init__.py b/hypex/__init__.py deleted file mode 100644 index a8f5e1fd..00000000 --- a/hypex/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Tools to configure resources matcher.""" -from .matcher import Matcher - - -__all__ = ["Matcher"] \ No newline at end of file diff --git a/hypex/ab_test/__init__.py b/hypex/ab_test/__init__.py deleted file mode 100644 index 1c9db15d..00000000 --- a/hypex/ab_test/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from hypex.ab_test.ab_tester import AATest, ABTest, merge_groups - -__all__ = ["AATest", "ABTest", "merge_groups"] \ No newline at end of file diff --git a/hypex/ab_test/ab_tester.py b/hypex/ab_test/ab_tester.py deleted file mode 100644 index 3c7dfe55..00000000 --- a/hypex/ab_test/ab_tester.py +++ /dev/null @@ -1,577 +0,0 @@ -import warnings -from copy import copy - -from IPython.display import display -from pathlib import Path -from sklearn.utils import shuffle -from typing import Iterable, Union, Optional, Dict, Any, Tuple - -from tqdm.auto import tqdm - -import pandas as pd -import numpy as np -from scipy.stats import ttest_ind, ks_2samp, mannwhitneyu - - -def merge_groups( - test_group: Union[Iterable[pd.DataFrame], pd.DataFrame], control_group: Union[Iterable[pd.DataFrame], pd.DataFrame], -) -> pd.DataFrame: - """Merges test and control groups in one DataFrame and creates column "group". - - Column "group" contains of "test" and "control" values. - - Args: - test_group: Data of target group - control_group: Data of control group - - Returns: - merged_data: Concatted DataFrame - """ - test_group.loc[:, "group"] = "test" - control_group.loc[:, "group"] = "control" - - merged_data = pd.concat([test_group, control_group], ignore_index=True) - - return merged_data - - -class AATest: - def __init__( - self, - target_fields: Union[Iterable[str], str], - info_cols: Union[Iterable[str], str] = None, - group_cols: Union[str, Iterable[str]] = None, - quant_field: str = None, - mode: str = "simple", - ): - self.target_fields = [target_fields] if isinstance(target_fields, str) else target_fields - self.info_cols = [info_cols] if isinstance(info_cols, str) else info_cols - self.group_cols = [group_cols] if isinstance(group_cols, str) else group_cols - self.quant_field = quant_field - self.mode = mode - - def _preprocessing_data(self, data: pd.DataFrame) -> pd.DataFrame: - """Converts categorical variables to dummy variables. - - Args: - data: Input data - - Returns: - clean_data: Data with categorical variables converted to dummy variables - """ - # categorical to dummies - prep_data = data.copy() - init_cols = data.columns - - dont_binarize_cols = copy(self.group_cols) or [] - if self.quant_field is not None: - dont_binarize_cols.append(self.quant_field) - - # if self.group_cols is not None: - prep_data = pd.get_dummies(prep_data.drop(columns=dont_binarize_cols), dummy_na=True) - prep_data = prep_data.merge(data[dont_binarize_cols], left_index=True, right_index=True) - - # fix if dummy_na is const=0 - dummies_cols = set(prep_data.columns) - set(init_cols) - const_columns = [col for col in dummies_cols if prep_data[col].nunique() <= 1] # choose constant_columns - - # drop constant dummy columns and info columns - cols_to_drop = const_columns + (self.info_cols if self.info_cols is not None else []) - clean_data = prep_data.drop(columns=cols_to_drop) - - return clean_data - - def __simple_mode(self, data: pd.DataFrame, random_state: int = None) -> Dict: - """Separates data on A and B samples within simple mode. - - Separation performed to divide groups of equal sizes - equal amount of records - or equal amount of groups in each sample. - - Args: - data: Input data - random_state: Seed of random - - Returns: - result: Test and control samples of indexes dictionary - """ - result = {"test_indexes": [], "control_indexes": []} - - if self.quant_field: - random_ids = shuffle(data[self.quant_field].unique(), random_state=random_state) - edge = len(random_ids) // 2 - result["test_indexes"] = list(data[data[self.quant_field].isin(random_ids[:edge])].index) - result["control_indexes"] = list(data[data[self.quant_field].isin(random_ids[edge:])].index) - - else: - addition_indexes = list(shuffle(data.index, random_state=random_state)) - edge = len(addition_indexes) // 2 - result["test_indexes"] = addition_indexes[:edge] - result["control_indexes"] = addition_indexes[edge:] - - return result - - def split(self, data: pd.DataFrame, preprocessing_data: bool = True, random_state: int = None) -> Dict: - """Divides sample on two groups. - - Args: - data: Raw input data - preprocessing_data: Flag to preprocess data - random_state: Seed of random - one integer to fix split - - Returns: - result: Dict of indexes with division on test and control group - """ - if preprocessing_data: - data = self._preprocessing_data(data) - result = {"test_indexes": [], "control_indexes": []} - - if self.group_cols: - groups = data.groupby(self.group_cols) - for _, gd in groups: - if self.mode not in ("balanced", "simple"): - warnings.warn( - f"The mode '{self.mode}' is not supported for group division. Implemented mode 'simple'." - ) - self.mode = "simple" - - if self.mode == "simple": - t_result = self.__simple_mode(gd, random_state) - result["test_indexes"] += t_result["test_indexes"] - result["control_indexes"] += t_result["control_indexes"] - - elif self.mode == "balanced": - if self.quant_field: - random_ids = shuffle(gd[self.quant_field].unique(), random_state=random_state) - addition_indexes = list(gd[gd[self.quant_field].isin(random_ids)].index) - else: - addition_indexes = list(shuffle(gd.index, random_state=random_state)) - - if len(result["control_indexes"]) > len(result["test_indexes"]): - result["test_indexes"] += addition_indexes - else: - result["control_indexes"] += addition_indexes - - else: - if self.mode != "simple": - warnings.warn( - f"The mode '{self.mode}' is not supported for regular division. " f"Implemented mode 'simple'." - ) - - t_result = self.__simple_mode(data, random_state) - result["test_indexes"] = t_result["test_indexes"] - result["control_indexes"] = t_result["control_indexes"] - - result["test_indexes"] = list(set(result["test_indexes"])) - result["control_indexes"] = list(set(result["control_indexes"])) - - return result - - @staticmethod - def _postprep_data(data, spit_indexes: Dict = None) -> pd.DataFrame: - """Prepares data to show user. - - Adds info_cols and decode binary variables - - Args: - data: Raw input data - spit_indexes: Dict of indexes with separation on test and control group - - Returns: - data: Separated initial data with column "group" - """ - # prep data to show user (add info_cols and decode binary variables) - test = data.loc[spit_indexes["test_indexes"]] - control = data.loc[spit_indexes["control_indexes"]] - data = merge_groups(test, control) - - return data - - @staticmethod - def calc_ab_delta(a_mean: float, b_mean: float, mode: str = "percentile"): - """Calculates target delta between A and B groups. - - Args: - a_mean: Average of target in one group - b_mean: Average of target in another group - mode: Type of expected result: - 'percentile' - percentage exceeding the average in group A compared to group B - 'absolute' - absolute value of difference between B and A group - 'relative' - percent in format of number (absolute) exceeding the average in group A compared to group B - - Returns: - result: Delta between groups as percent or absolute value - """ - if mode == "percentile": - result = (1 - a_mean / b_mean) * 100 - return result - if mode == "absolute": - result = b_mean - a_mean - return result - if mode == "relative": - result = 1 - a_mean / b_mean - return result - - def sampling_metrics( - self, data: pd.DataFrame, alpha: float = 0.05, random_state: int = None, preprocessed_data: pd.DataFrame = None - ): - """Calculates metrics of one sampling. - - Args: - data: Raw input data - alpha: Threshold to check statistical hypothesis; usually 0.05 - random_state: Random seeds for searching - preprocessed_data: Pre-preprocessed data - - Returns: - result: Tuple of - 1) metrics dataframe (stat tests) and - 2) dict of random state with test_control dataframe - """ - - data_from_sampling_dict = {} - scores = [] - t_result = {"random_state": random_state} - - split = self.split(data, preprocessed_data is None, random_state) - - a = data.loc[split["test_indexes"]] - b = data.loc[split["control_indexes"]] - - # prep data to show user (merge indexes and init data) - data_from_sampling_dict[random_state] = self._postprep_data(data, split) - - for tf in self.target_fields: - ta = a[tf] - tb = b[tf] - - t_result[f"{tf} a mean"] = ta.mean() - t_result[f"{tf} b mean"] = tb.mean() - t_result[f"{tf} ab delta"] = self.calc_ab_delta( - t_result[f"{tf} a mean"], t_result[f"{tf} b mean"], "absolute" - ) - t_result[f"{tf} ab delta %"] = self.calc_ab_delta( - t_result[f"{tf} a mean"], t_result[f"{tf} b mean"], "percentile" - ) - t_result[f"{tf} t_test p_value"] = ttest_ind(ta, tb, nan_policy="omit").pvalue - t_result[f"{tf} ks_test p_value"] = ks_2samp(ta, tb).pvalue - t_result[f"{tf} t_test passed"] = t_result[f"{tf} t_test p_value"] > alpha - t_result[f"{tf} ks_test passed"] = t_result[f"{tf} ks_test p_value"] > alpha - scores.append((t_result[f"{tf} t_test p_value"] + t_result[f"{tf} ks_test p_value"]) / 2) - - t_result["mean_tests_score"] = np.mean(scores) - result = {"metrics": t_result, "data_from_experiment": data_from_sampling_dict} - - return result - - def search_dist_uniform_sampling( - self, - data: pd.DataFrame, - alpha: float = 0.05, - iterations: int = 10, - file_name: Union[Path, str] = None, - write_mode: str = "full", - write_step: int = None, - pbar: bool = True, - ) -> Optional[Tuple[pd.DataFrame, Dict[Any, Dict]]]: - """Chooses random_state for finding homogeneous distribution. - - Args: - data: Raw input data - alpha: - Threshold to check statistical hypothesis; usually 0.05 - iterations: - Number of iterations to search uniform sampling to searching - file_name: - Name of file to save results (if None - no results will be saved, func returns result) - write_mode: - Mode to write: - 'full' - save all experiments - 'all' - save experiments that passed all statistical tests - 'any' - save experiments that passed any statistical test - write_step: - Step to write experiments to file - pbar: - Flag to show progress bar - - Returns: - results: - If no saving (no file_name, no write mode and no write_step) returns dataframe - else None and saves file to csv - """ - random_states = range(iterations) - results = [] - data_from_sampling = {} - - preprocessed_data = self._preprocessing_data(data) - - if write_mode not in ("full", "all", "any"): - warnings.warn(f"Write mode '{write_mode}' is not supported. Mode 'full' will be used") - write_mode = "full" - - for i, rs in tqdm(enumerate(random_states), total=len(random_states), disable=not pbar): - res = self.sampling_metrics(data, alpha=alpha, random_state=rs, preprocessed_data=preprocessed_data) - data_from_sampling.update(res["data_from_experiment"]) - - # write to file - passed = [] - for tf in self.target_fields: - passed += [res["metrics"][f"{tf} t_test passed"], res["metrics"][f"{tf} ks_test passed"]] - - if write_mode == "all" and all(passed): - results.append(res["metrics"]) - if write_mode == "any" and any(passed): - results.append(res["metrics"]) - if write_mode == "full": - results.append(res["metrics"]) - - if file_name and write_step: - if i == write_step: - pd.DataFrame(results).to_csv(file_name, index=False) - elif i % write_step == 0: - pd.DataFrame(results).to_csv(file_name, index=False, header=False, mode="a") - results = [] - - if file_name and write_step: - pd.DataFrame(results).to_csv(file_name, index=False, header=False, mode="a") - elif file_name: - results = pd.DataFrame(results) - results.to_csv(file_name, index=False) - return results, data_from_sampling - else: - return pd.DataFrame(results), data_from_sampling - - -class ABTest: - def __init__( - self, calc_difference_method: str = "all", calc_p_value_method: str = "all", - ): - """Initializes the ABTest class. - - Args: - calc_difference_method: - The method used to calculate the difference: - 'all' [default] - all metrics - 'ate' - basic difference in means of targets in test and control group - 'diff_in_diff' - difference in difference value, - performs pre-post analysis (required values of target before pilot) - 'cuped' - Controlled-Experiment using Pre-Experiment Data value, - performs pre-post analysis (required values of target before pilot) - calc_p_value_method: - The method used to calculate the p-value. Defaults to 'all' - """ - self.calc_difference_method = calc_difference_method - self.calc_p_value_method = calc_p_value_method - self.results = None - - @staticmethod - def split_ab(data: pd.DataFrame, group_field: str) -> Dict[str, pd.DataFrame]: - """Splits a pandas DataFrame into two separate dataframes based on a specified group field. - - Args: - data: - The input dataframe to be split - group_field: - The column name representing the group field - - Returns: - splitted_data: - A dictionary containing two dataframes, 'test' and 'control', where 'test' contains rows where the - group field is 'test', and 'control' contains rows where the group field is 'control'. - """ - splitted_data = { - "test": data[data[group_field] == "test"], - "control": data[data[group_field] == "control"], - } - return splitted_data - - @staticmethod - def cuped( - test_data: pd.DataFrame, control_data: pd.DataFrame, target_field: str, target_field_before: str - ) -> float: - """Counts CUPED (Controlled-Experiment using Pre-Experiment Data) in absolute values. - - Metric uses pre-post analysis of target, uses to minimize variance of effect: - ATE = mean(test_cuped) - mean(control_cuped) - , where - test_cuped = target__test - theta * target_before__test - control_cuped = target__control - theta * target_before__control - , where - theta = (cov_test + cov_control) / (var_test + var_control) - , where - cov_test = cov(target__test, target_before__test) - cov_control = cov(target__control, target_before__control) - var_test = var(target_before__test) - var_control = var(target_before__control) - - Args: - test_data: - Input data of test group - Should include target before and after pilot - control_data: - Input data of control group - Should include target before and after pilot - target_field: - Column name of target after pilot - target_field_before: - Column name of target before pilot - - Returns: - result: - Named tuple with pvalue, effect, ci_length, left_bound and right_bound - """ - control = control_data[target_field] - control_before = control_data[target_field_before] - test = test_data[target_field] - test_before = test_data[target_field_before] - - theta = (np.cov(control, control_before)[0, 1] + np.cov(test, test_before)[0, 1]) / ( - np.var(control_before) + np.var(test_before) - ) - - control_cuped = control - theta * control_before - test_cuped = test - theta * test_before - - mean_control = np.mean(control_cuped) - mean_test = np.mean(test_cuped) - - difference_mean = mean_test - mean_control - - return difference_mean - - @staticmethod - def diff_in_diff( - test_data: pd.DataFrame, control_data: pd.DataFrame, target_field: str, target_field_before: str - ) -> float: - """Counts Difference in Difference. - - Metric uses pre-post analysis and counts difference in means in data before and after pilot: - ATE = (y_test_after - y_control_after) - (y_test_before - y_control_before) - - Args: - test_data: Input data of test group - control_data: Input data of control group - target_field: Column name of target after pilot - target_field_before: Column name of target before pilot - - Returns: - did: Value of difference in difference - """ - mean_test = np.mean(test_data[target_field]) - mean_control = np.mean(control_data[target_field]) - - mean_test_before = np.mean(test_data[target_field_before]) - mean_control_before = np.mean(control_data[target_field_before]) - did = (mean_test - mean_control) - (mean_test_before - mean_control_before) - - return did - - def calc_difference( - self, splitted_data: Dict[str, pd.DataFrame], target_field: str, target_field_before: str = None - ) -> Dict[str, float]: - """Calculates the difference between the target field values of the 'test' and 'control' dataframes. - - Args: - splitted_data: - A dictionary containing the 'test' and 'control' dataframes - target_field: - The name of the target field contains data after pilot - target_field_before: - The name of the target field contains data before pilot - - Returns: - result: - A dictionary containing the difference between the target field - values of the 'test' and 'control' dataframes - """ - result = {} - if self.calc_difference_method in {"all", "diff_in_diff", "cuped"}: - if target_field_before is None: - raise ValueError( - "For calculation metrics 'cuped' or 'diff_in_diff' field 'target_field_before' is required.\n" - "Metric 'ate'(=diff-in-means) can be used without 'target_field_before'" - ) - - if self.calc_difference_method in {"all", "ate"}: - result["ate"] = ( - splitted_data["test"][target_field].values - splitted_data["control"][target_field].values - ).mean() - - if self.calc_difference_method in {"all", "cuped"}: - result["cuped"] = self.cuped( - test_data=splitted_data["test"], - control_data=splitted_data["control"], - target_field=target_field, - target_field_before=target_field_before, - ) - - if self.calc_difference_method in {"all", "diff_in_diff"}: - result["diff_in_diff"] = self.diff_in_diff( - test_data=splitted_data["test"], - control_data=splitted_data["control"], - target_field=target_field, - target_field_before=target_field_before, - ) - - return result - - def calc_p_value(self, splitted_data: Dict[str, pd.DataFrame], target_field: str) -> Dict[str, float]: - """Calculates the p-value for a given data set. - - Args: - splitted_data: - A dictionary containing the split data, where the keys are 'test' and 'control' - and the values are pandas DataFrames - target_field: - The name of the target field - Returns: - result: - A dictionary containing the calculated p-values, where the keys are 't_test' and 'mann_whitney' - and the values are the corresponding p-values - """ - result = {} - if self.calc_p_value_method in {"all", "t_test"}: - result["t_test"] = ttest_ind( - splitted_data["test"][target_field], splitted_data["control"][target_field], - ).pvalue - - if self.calc_p_value_method in {"all", "mann_whitney"}: - result["mann_whitney"] = mannwhitneyu( - splitted_data["test"][target_field], splitted_data["control"][target_field], - ).pvalue - - return result - - def execute( - self, data: pd.DataFrame, target_field: str, group_field: str, target_field_before: str = None - ) -> Dict[str, Dict[str, float]]: - """Splits the input data based on the group field and calculates the size, difference, and p-value. - - Parameters: - data: Input data as a pandas DataFrame - target_field: Target field to be analyzed - group_field: Field used to split the data into groups - target_field_before: Target field without treatment to be analyzed - - Returns: - results: - A dictionary containing the size, difference, and p-value of the split data - 'size': A dictionary with the sizes of the test and control groups - 'difference': A dictionary with the calculated differences between the groups - 'p_value': A dictionary with the calculated p-values for each group - """ - splitted_data = self.split_ab(data, group_field) - - results = { - "size": {"test": len(splitted_data["test"]), "control": len(splitted_data["control"])}, - "difference": self.calc_difference(splitted_data, target_field, target_field_before), - "p_value": self.calc_p_value(splitted_data, target_field), - } - - self.results = results - - return results - - def show_beautiful_result(self): - """Shows results of 'execute' function - dict as dataframes.""" - for k in self.results.keys(): - display(pd.DataFrame(self.results[k], index=[k]).T) diff --git a/hypex/algorithms/__init__.py b/hypex/algorithms/__init__.py deleted file mode 100644 index b4e68c97..00000000 --- a/hypex/algorithms/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from hypex.algorithms.faiss_matcher import FaissMatcher -from hypex.algorithms.no_replacement_matching import MatcherNoReplacement - -__all__ = ['FaissMatcher', "MatcherNoReplacement"] \ No newline at end of file diff --git a/hypex/algorithms/faiss_matcher.py b/hypex/algorithms/faiss_matcher.py deleted file mode 100644 index f6320307..00000000 --- a/hypex/algorithms/faiss_matcher.py +++ /dev/null @@ -1,962 +0,0 @@ -"""Class that searches indexes.""" -import datetime as dt -import functools -import logging -import time -from typing import Any -from typing import Dict -from typing import Tuple -from typing import Union - -import faiss -import numpy as np -import pandas as pd -from scipy.stats import norm -from tqdm.auto import tqdm - -from ..utils.metrics import check_repeats -from ..utils.metrics import matching_quality - - -def timer(func): - """Decorator to measure the execution time of a function. - - Uses time.perf_counter() to determine the start and end times - of the decorated function and then prints the total execution time - - Usage Example: - - @timer - def example_function(): - ... - - Args: - func: The function whose execution time is to be measured - - Returns: - Wrapped version of the original function with added time measurement - """ - - @functools.wraps(func) - def _wrapper(*args, **kwargs): - start = time.perf_counter() - result = func(*args, **kwargs) - runtime = time.perf_counter() - start - print(f"{func.__name__} took {runtime:.4f} secs") - return result - - return _wrapper - - -faiss.cvar.distance_compute_blas_threshold = 100000 -POSTFIX = "_matched" -POSTFIX_BIAS = "_matched_bias" - -logger = logging.getLogger("Faiss hypex") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -class FaissMatcher: - """A class used to match instances using Faiss library.""" - - def __init__( - self, - df: pd.DataFrame, - outcomes: str, - treatment: str, - info_col: list, - features: [list, pd.DataFrame] = None, - group_col: str = None, - weights: dict = None, - sigma: float = 1.96, - validation: bool = None, - n_neighbors: int = 10, - silent: bool = True, - pbar: bool = True, - ): - """Construct all the necessary attributes. - - Args: - df: - The input dataframe - outcomes: - The target column name - treatment: - The column name with treatment - info_col: - A list with informational column names - features: - A list with names of feature using to matching. Defaults to None - group_col: - The column for stratification. Defaults to None - weights: - Dict with wight of features to matching. If you would like that matching will be more for - 1 feature and less for another one - sigma: - The significant level for confidence interval calculation Defaults to 1.96 - validation: - The flag for validation of estimated ATE with default method `random_feature` - n_neighbors: - The number of neighbors to find for each object. Defaults to 10 - silent: - Write logs in debug mode - pbar: - Display progress bar while get index - """ - self.n_neighbors = n_neighbors - if group_col is None: - self.df = df - else: - self.df = df.sort_values([treatment, group_col]) - self.columns_del = [outcomes] - if info_col: - self.info_col = info_col - else: - self.info_col = [] - - if self.info_col is not None: - self.columns_del = self.columns_del + [x for x in self.info_col if x in self.df.columns] - self.outcomes = outcomes if type(outcomes) == list else [outcomes] - self.treatment = treatment - - if features is None: - self.columns_match = list( - set([x for x in list(self.df.columns) if x not in self.info_col] + [self.treatment] + self.outcomes) - ) - else: - try: - self.columns_match = features["Feature"].tolist() + [self.treatment] + self.outcomes - except TypeError: - self.columns_match = features + [self.treatment] + self.outcomes - - self.features_quality = ( - self.df.drop(columns=[self.treatment] + self.outcomes + self.info_col) - .select_dtypes(include=["int16", "int32", "int64", "float16", "float32", "float64"]) - .columns - ) - self.dict_outcome_untreated = {} - self.dict_outcome_treated = {} - self.group_col = group_col - self.weights = weights - self.treated_index = None - self.untreated_index = None - self.orig_treated_index = None - self.orig_untreated_index = None - self.results = {} - self.ATE = None - self.sigma = sigma - self.quality_dict = {} - self.rep_dict = None - self.validation = validation - self.silent = silent - self.pbar = pbar - self.tqdm = None - self.results = pd.DataFrame() - - def __getstate__(self) -> dict: - """Prepare the object for serialization. - - This method is called when the object is about to be serialized. - It removes the `tqdm` attribute from the object's dictionary - because `tqdm` objects cannot be serialized. - - Returns: - A copy of the object's dictionary with the `tqdm` attribute removed. - """ - state = self.__dict__.copy() - if "tqdm" in state: - del state["tqdm"] - return state - - def __setstate__(self, state: dict): - """Restore the object after deserialization. - - This method is called when the object is deserialized. - It adds the `tqdm` attribute back to the object's dictionary - if the `pbar` attribute is True. - - Args: - state: - The deserialized state of the object - """ - if "pbar" in state and state["pbar"]: - state["tqdm"] = None - self.__dict__.update(state) - - def _get_split(self, df: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): - """Creates split data by treatment column. - - Separate treatment column with 1 (treated) an 0 (untreated), - scales and transforms treatment column - - Args: - df: - The input dataframe - - Returns: - Tuple of dataframes - one for treated (df[self.treatment] == 1]) and - one for untreated (df[self.treatment] == 0]). Drops self.outcomes and - `self.treatment` columns - - """ - logger.debug("Creating split data by treatment column") - - treated = df[df[self.treatment] == 1].drop([self.treatment] + self.outcomes, axis=1) - untreated = df[df[self.treatment] == 0].drop([self.treatment] + self.outcomes, axis=1) - - return treated, untreated - - def _predict_outcome(self, std_treated: pd.DataFrame, std_untreated: pd.DataFrame): - """Applies LinearRegression to input arrays. - - Calculate biases of treated and untreated values, - creates dict of y - regular, matched and without bias. - - Args: - std_treated: - The dataframe of treated data - std_untreated: - The dataframe of untreated data - - """ - logger.debug("Predicting target by Linear Regression") - - start_time = dt.datetime.now() - logger.debug("start --") - - self.dict_outcome_untreated = {} - self.dict_outcome_treated = {} - df = self.df.drop(columns=self.info_col) - - for outcome in self.outcomes: - y_untreated = df[df[self.treatment] == 0][outcome].to_numpy() - y_treated = df[df[self.treatment] == 1][outcome].to_numpy() - - x_treated = std_treated.to_numpy() - x_untreated = std_untreated.to_numpy() - y_match_treated = np.array([y_untreated[idx].mean() for idx in self.treated_index]) - y_match_untreated = np.array([y_treated[idx].mean() for idx in self.untreated_index]) - x_match_treated = np.array([x_untreated[idx].mean(0) for idx in self.treated_index]) - x_match_untreated = np.array([x_treated[idx].mean(0) for idx in self.untreated_index]) - bias_coefs_c = bias_coefs(self.untreated_index, y_treated, x_treated) - bias_coefs_t = bias_coefs(self.treated_index, y_untreated, x_untreated) - bias_c = bias(x_untreated, x_match_untreated, bias_coefs_c) - bias_t = bias(x_treated, x_match_treated, bias_coefs_t) - - y_match_treated_bias = y_treated - y_match_treated + bias_t - y_match_untreated_bias = y_match_untreated - y_untreated - bias_c - - self.dict_outcome_untreated[outcome] = y_untreated - self.dict_outcome_untreated[outcome + POSTFIX] = y_match_untreated - self.dict_outcome_untreated[outcome + POSTFIX_BIAS] = y_match_untreated_bias - - self.dict_outcome_treated[outcome] = y_treated - self.dict_outcome_treated[outcome + POSTFIX] = y_match_treated - self.dict_outcome_treated[outcome + POSTFIX_BIAS] = y_match_treated_bias - - end_time = dt.datetime.now() - total = dt.datetime.strptime(str(end_time - start_time), "%H:%M:%S.%f").strftime("%H:%M:%S") - logger.debug(f"end -- [work time{total}]") - - def _create_outcome_matched_df(self, dict_outcome: dict, is_treated: bool) -> pd.DataFrame: - """Creates dataframe with outcomes values and treatment. - - Args: - dict_outcome: - A dictionary containing outcomes - is_treated: - A boolean value indicating whether the outcome is treated or not - - Returns: - A dataframe with matched outcome and treatment columns - - """ - df_pred = pd.DataFrame(dict_outcome) - df_pred[self.treatment] = int(is_treated) - df_pred[self.treatment + POSTFIX] = int(not is_treated) - - return df_pred - - def _create_features_matched_df(self, index: np.ndarray, is_treated: bool) -> pd.DataFrame: - """Creates matched dataframe with features. - - Args: - index: - An array of indices - is_treated: - A boolean value indicating whether the outcome is treated or not - - - Returns: - A dataframe of matched features - - """ - df = self.df.drop(columns=self.outcomes + self.info_col) - - if self.group_col is None: - untreated_index = df[df[self.treatment] == int(not is_treated)].index.to_numpy() - converted_index = [untreated_index[i] for i in index] - filtered = df.loc[df[self.treatment] == int(not is_treated)].values - untreated_df = pd.DataFrame( - data=np.array([filtered[idx].mean(axis=0) for idx in index]), columns=df.columns - ) # добавить дату в данные и пофиксить баги с этим (тут ломалось) - if self.info_col is not None and len(self.info_col) != 1: - untreated_df["index"] = pd.Series(converted_index) - treated_df = df[df[self.treatment] == int(is_treated)].reset_index() - else: - ids = self.df[df[self.treatment] == int(not is_treated)][self.info_col].values.ravel() - converted_index = [ids[i] for i in index] - untreated_df["index"] = pd.Series(converted_index) - treated_df = df[df[self.treatment] == int(is_treated)].reset_index() - treated_df["index"] = self.df[self.df[self.treatment] == int(is_treated)][self.info_col].values.ravel() - else: - df = df.sort_values([self.treatment, self.group_col]) - untreated_index = df[df[self.treatment] == int(not is_treated)].index.to_numpy() - converted_index = [untreated_index[i] for i in index] - filtered = df.loc[df[self.treatment] == int(not is_treated)] - cols_untreated = [col for col in filtered.columns if col != self.group_col] - filtered = filtered.drop(columns=self.group_col).to_numpy() - untreated_df = pd.DataFrame( - data=np.array([filtered[idx].mean(axis=0) for idx in index]), columns=cols_untreated - ) - treated_df = df[df[self.treatment] == int(is_treated)].reset_index() - grp = treated_df[self.group_col] - untreated_df[self.group_col] = grp - if self.info_col is not None and len(self.info_col) != 1: - untreated_df["index"] = pd.Series(converted_index) - else: - ids = ( - self.df[df[self.treatment] == int(not is_treated)] - .sort_values([self.treatment, self.group_col])[self.info_col] - .values.ravel() - ) - converted_index = [ids[i] for i in index] - untreated_df["index"] = pd.Series(converted_index) - treated_df["index"] = self.df[self.df[self.treatment] == int(is_treated)][self.info_col].values.ravel() - untreated_df.columns = [col + POSTFIX for col in untreated_df.columns] - - x = pd.concat([treated_df, untreated_df], axis=1).drop( - columns=[self.treatment, self.treatment + POSTFIX], axis=1 - ) - return x - - def _create_matched_df(self) -> pd.DataFrame: - """Creates matched df of features and outcome. - - Returns: - Matched dataframe - """ - df_pred_treated = self._create_outcome_matched_df(self.dict_outcome_treated, True) - df_pred_untreated = self._create_outcome_matched_df(self.dict_outcome_untreated, False) - - df_matched = pd.concat([df_pred_treated, df_pred_untreated]) - - treated_x = self._create_features_matched_df(self.treated_index, True) - untreated_x = self._create_features_matched_df(self.untreated_index, False) - - untreated_x = pd.concat([treated_x, untreated_x]) - - columns = list(untreated_x.columns) + list(df_matched.columns) - - df_matched = pd.concat([untreated_x, df_matched], axis=1, ignore_index=True) - df_matched.columns = columns - - return df_matched - - def calc_atc(self, df: pd.DataFrame, outcome: str) -> tuple: - """Calculates Average Treatment Effect for the control group (ATC). - - Effect on control group if it was affected - - Args: - df: - Input dataframe - outcome: - The outcome to be considered for treatment effect - - Returns: - Contains ATC, scaled counts, and variances as numpy arrays - - """ - logger.debug("Calculating ATC") - - df = df[df[self.treatment] == 0] - N_c = len(df) - ITT_c = df[outcome + POSTFIX_BIAS] - scaled_counts_c = scaled_counts(N_c, self.treated_index, self.silent) - - vars_c = np.repeat(ITT_c.var(), N_c) # conservative - atc = ITT_c.mean() - - return atc, scaled_counts_c, vars_c - - def calc_att(self, df: pd.DataFrame, outcome: str) -> tuple: - """Calculates Average Treatment Effect for the treated (ATT). - - Args: - df: - Input dataframe - outcome: - The outcome to be considered for treatment effect - - Returns: - Contains ATT, scaled counts, and variances as numpy arrays - - """ - logger.debug("Calculating ATT") - - df = df[df[self.treatment] == 1] - N_t = len(df) - ITT_t = df[outcome + POSTFIX_BIAS] - scaled_counts_t = scaled_counts(N_t, self.untreated_index, self.silent) - - vars_t = np.repeat(ITT_t.var(), N_t) # conservative - att = ITT_t.mean() - - return att, scaled_counts_t, vars_t - - def _calculate_ate_all_target(self, df: pd.DataFrame): - """Creates dictionaries of all effect: ATE, ATC, ATT. - - Args: - df: - Input dataframe - - """ - logger.debug("Creating dicts of all effects: ATE, ATC, ATT") - - att_dict = {} - atc_dict = {} - ate_dict = {} - N = len(df) - N_t = df[self.treatment].sum() - N_c = N - N_t - - for outcome in self.outcomes: - att, scaled_counts_t, vars_t = self.calc_att(df, outcome) - atc, scaled_counts_c, vars_c = self.calc_atc(df, outcome) - ate = (N_c / N) * atc + (N_t / N) * att - - att_se = calc_att_se(vars_c, vars_t, scaled_counts_c) - atc_se = calc_atc_se(vars_c, vars_t, scaled_counts_t) - ate_se = calc_ate_se(vars_c, vars_t, scaled_counts_c, scaled_counts_t) - - ate_dict[outcome] = [ - ate, - ate_se, - pval_calc(ate / ate_se), - ate - self.sigma * ate_se, - ate + self.sigma * ate_se, - ] - atc_dict[outcome] = [ - atc, - atc_se, - pval_calc(atc / atc_se), - atc - self.sigma * atc_se, - atc + self.sigma * atc_se, - ] - att_dict[outcome] = [ - att, - att_se, - pval_calc(att / att_se), - att - self.sigma * att_se, - att + self.sigma * att_se, - ] - - self.ATE, self.ATC, self.ATT = ate_dict, atc_dict, att_dict - self.val_dict = ate_dict - - def matching_quality(self, df_matched) -> Dict[str, Union[Dict[str, float], float]]: - """Estimated the quality of covariates balance and repeat fraction. - - Calculates population stability index,Standardized mean difference - and Kolmogorov-Smirnov test for numeric values. Returns a dictionary of reports. - - Args: - df_matched: - Matched DataFrame to calculate quality - - Returns: - dictionary containing PSI, KS-test, SMD data and repeat fractions - - """ - if self.silent: - logger.debug("Estimating quality of matching") - else: - logger.info("Estimating quality of matching") - - psi_columns = set(self.columns_match) - psi_columns = list(psi_columns - set([self.treatment] + self.outcomes)) - psi_data, ks_data, smd_data = matching_quality( - df_matched, self.treatment, sorted(self.features_quality), sorted(psi_columns), self.silent - ) - - rep_dict = { - "match_control_to_treat": check_repeats(np.concatenate(self.treated_index), silent=self.silent), - "match_treat_to_control": check_repeats(np.concatenate(self.untreated_index), silent=self.silent), - } - - self.quality_dict = {"psi": psi_data, "ks_test": ks_data, "smd": smd_data, "repeats": rep_dict} - - rep_df = pd.DataFrame.from_dict(rep_dict, orient="index").rename(columns={0: "value"}) - self.rep_dict = rep_df - - if self.silent: - logger.debug(f"PSI info: \n {psi_data.head(10)} \nshape:{psi_data.shape}") - logger.debug(f"Kolmogorov-Smirnov test info: \n {ks_data.head(10)} \nshape:{ks_data.shape}") - logger.debug(f"Standardised mean difference info: \n {smd_data.head(10)} \nshape:{smd_data.shape}") - logger.debug(f"Repeats info: \n {rep_df.head(10)}") - else: - logger.info(f"PSI info: \n {psi_data.head(10)} \nshape:{psi_data.shape}") - logger.info(f"Kolmogorov-Smirnov test info: \n {ks_data.head(10)} \nshape:{ks_data.shape}") - logger.info(f"Standardised mean difference info: \n {smd_data.head(10)} \nshape:{smd_data.shape}") - logger.info(f"Repeats info: \n {rep_df.head(10)}") - - return self.quality_dict - - def group_match(self): - """Matches the dataframe if it divided by groups. - - Returns: - A tuple containing the matched dataframe and metrics such as ATE, ATT and ATC - - """ - df = self.df.drop(columns=self.info_col) - groups = sorted(df[self.group_col].unique()) - matches_c = [] - matches_t = [] - group_arr_c = df[df[self.treatment] == 0][self.group_col].to_numpy() - group_arr_t = df[df[self.treatment] == 1][self.group_col].to_numpy() - treat_arr_c = df[df[self.treatment] == 0][self.treatment].to_numpy() - treat_arr_t = df[df[self.treatment] == 1][self.treatment].to_numpy() - - if self.pbar: - self.tqdm = tqdm(total=len(groups) * 2) - - for group in groups: - df_group = df[df[self.group_col] == group] - temp = df_group[self.columns_match + [self.group_col]] - temp = temp.loc[:, (temp != 0).any(axis=0)].drop(columns=self.group_col) - treated, untreated = self._get_split(temp) - - std_treated_np, std_untreated_np = _transform_to_np(treated, untreated, self.weights) - - if self.pbar: - self.tqdm.set_description(desc=f"Get untreated index by group {group}") - matches_u_i = _get_index(std_treated_np, std_untreated_np, self.n_neighbors) - - if self.pbar: - self.tqdm.update(1) - self.tqdm.set_description(desc=f"Get treated index by group {group}") - matches_t_i = _get_index(std_untreated_np, std_treated_np, self.n_neighbors) - if self.pbar: - self.tqdm.update(1) - self.tqdm.refresh() - - group_mask_c = group_arr_c == group - group_mask_t = group_arr_t == group - matches_c_mask = np.arange(treat_arr_t.shape[0])[group_mask_t] - matches_u_i = [matches_c_mask[i] for i in matches_u_i] - matches_t_mask = np.arange(treat_arr_c.shape[0])[group_mask_c] - matches_t_i = [matches_t_mask[i] for i in matches_t_i] - matches_c.extend(matches_u_i) - matches_t.extend(matches_t_i) - - if self.pbar: - self.tqdm.close() - - self.untreated_index = matches_c - self.treated_index = matches_t - - df_group = df[self.columns_match].drop(columns=self.group_col) - treated, untreated = self._get_split(df_group) - self._predict_outcome(treated, untreated) - df_matched = self._create_matched_df() - self._calculate_ate_all_target(df_matched) - - if self.validation: - return self.val_dict - - return self.report_view(), df_matched - - def match(self): - """Matches the dataframe. - - Returns: - A tuple containing the matched dataframe and metrics such as ATE, ATT and ATC - - """ - if self.group_col is not None: - return self.group_match() - - df = self.df[self.columns_match] - treated, untreated = self._get_split(df) - - std_treated_np, std_untreated_np = _transform_to_np(treated, untreated, self.weights) - - if self.pbar: - self.tqdm = tqdm(total=len(std_treated_np) + len(std_untreated_np)) - self.tqdm.set_description(desc="Get untreated index") - - untreated_index = _get_index(std_treated_np, std_untreated_np, self.n_neighbors) - - if self.pbar: - self.tqdm.update(len(std_treated_np)) - self.tqdm.set_description(desc="Get treated index") - treated_index = _get_index(std_untreated_np, std_treated_np, self.n_neighbors) - - if self.pbar: - self.tqdm.update(len(std_untreated_np)) - self.tqdm.refresh() - self.tqdm.close() - - self.untreated_index = untreated_index - self.treated_index = treated_index - - self._predict_outcome(treated, untreated) - - df_matched = self._create_matched_df() - self._calculate_ate_all_target(df_matched) - - if self.validation: - return self.val_dict - - return self.report_view(), df_matched - - def report_view(self) -> pd.DataFrame: - """Formats the ATE, ATC, and ATT results into a Pandas DataFrame for easy viewing. - - Returns: - DataFrame containing ATE, ATC, and ATT results - """ - result = (self.ATE, self.ATC, self.ATT) - - for outcome in self.outcomes: - res = pd.DataFrame( - [x[outcome] + [outcome] for x in result], - columns=["effect_size", "std_err", "p-val", "ci_lower", "ci_upper", "outcome"], - index=["ATE", "ATC", "ATT"], - ) - self.results = pd.concat([self.results, res]) - return self.results - - -def map_func(x: np.ndarray) -> np.ndarray: - """Get the indices of elements in an array that are equal to the first element. - - Args: - x: - An input array. - - Returns: - Array of indices where the elements match the first element of x. - """ - return np.where(x == x[0])[0] - - -def f2(x: np.ndarray, y: np.ndarray) -> Any: - """Index an array using a secondary array of indices. - - Args: - x: - An input array. - y: - Array of indices used for indexing x. - - Returns: - Indexed element from the input array x. - """ - return x[y] - - -def _get_index(base: np.ndarray, new: np.ndarray, n_neighbors: int) -> list: - """Gets array of indexes that match a new array. - - Args: - base: - A numpy array serving as the reference for matching - new: - A numpy array that needs to be matched with the base - n_neighbors: - The number of neighbors to use for the matching - - Returns: - An array of indexes containing all neighbours with minimum distance - """ - index = faiss.IndexFlatL2(base.shape[1]) - index.add(base) - dist, indexes = index.search(new, 20) - if n_neighbors == 1: - equal_dist = list(map(map_func, dist)) - indexes = [f2(i, j) for i, j in zip(indexes, equal_dist)] - else: - indexes = f3(indexes, dist, n_neighbors) - return indexes - - -def _transform_to_np(treated: pd.DataFrame, untreated: pd.DataFrame, weights: dict) -> Tuple[np.ndarray, np.ndarray]: - """Transforms df to numpy and transform via Cholesky decomposition. - - Args: - treated: - Test subset DataFrame to be transformed - untreated: - Control subset DataFrame to be transformed - weights: - Dict with weights for each feature. By default is 1 - - Returns: - A tuple of transformed numpy arrays for treated and untreated data respectively - """ - xc = untreated.to_numpy() - xt = treated.to_numpy() - - cov = conditional_covariance(xc, xt) - - epsilon = 1e-3 - cov = cov + np.eye(cov.shape[0]) * epsilon - - L = np.linalg.cholesky(cov) - mahalanobis_transform = np.linalg.inv(L) - if weights is not None: - features = treated.columns - w_list = np.array([weights[col] if col in weights.keys() else 1 for col in features]) - w_matrix = np.sqrt(np.diag(w_list / w_list.sum())) - mahalanobis_transform = np.dot(w_matrix, mahalanobis_transform) - yc = np.dot(xc, mahalanobis_transform.T) - yt = np.dot(xt, mahalanobis_transform.T) - - return yt.copy(order="C").astype("float32"), yc.copy(order="C").astype("float32") - - -def calc_atx_var(vars_c: np.ndarray, vars_t: np.ndarray, weights_c: np.ndarray, weights_t: np.ndarray) -> float: - """Calculates Average Treatment Effect for the treated (ATT) variance. - - Args: - vars_c: - Control group variance - vars_t: - Treatment group variance - weights_c: - Control group weights - weights_t: - Treatment group weights - - Returns: - The calculated ATT variance - - """ - N_c, N_t = len(vars_c), len(vars_t) - summands_c = weights_c ** 2 * vars_c - summands_t = weights_t ** 2 * vars_t - - return summands_t.sum() / N_t ** 2 + summands_c.sum() / N_c ** 2 - - -def calc_atc_se(vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_t: np.ndarray) -> float: - """Calculates Average Treatment Effect for the control group (ATC) standard error. - - Args: - vars_c: - Control group variance - vars_t: - Treatment group variance - scaled_counts_t: - Scaled counts for treatment group - - Returns: - The calculated ATC standard error - """ - N_c, N_t = len(vars_c), len(vars_t) - weights_c = np.ones(N_c) - weights_t = (N_t / N_c) * scaled_counts_t - - var = calc_atx_var(vars_c, vars_t, weights_c, weights_t) - - return np.sqrt(var) - - -def conditional_covariance(xc, xt): - """Calculates covariance according to Imbens, Rubin model.""" - cov_c = np.cov(xc, rowvar=False, ddof=0) - cov_t = np.cov(xt, rowvar=False, ddof=0) - cov = (cov_c + cov_t) / 2 - - return cov - - -def calc_att_se(vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_c: np.ndarray) -> float: - """Calculates Average Treatment Effect for the treated (ATT) standard error. - - Args: - vars_c: - Control group variance - vars_t: - Treatment group variance - scaled_counts_c: - Scaled counts for control group - - Returns: - The calculated ATT standard error - """ - N_c, N_t = len(vars_c), len(vars_t) - weights_c = (N_c / N_t) * scaled_counts_c - weights_t = np.ones(N_t) - - var = calc_atx_var(vars_c, vars_t, weights_c, weights_t) - - return np.sqrt(var) - - -def calc_ate_se( - vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_c: np.ndarray, scaled_counts_t: np.ndarray -) -> float: - """Calculates Average Treatment Effect for the control group (ATC) standard error. - - Args: - vars_c: - Control group variance - vars_t: - Treatment group variance - scaled_counts_c: - Scaled counts for control group - scaled_counts_t: - Scaled counts for treatment group - - Returns: - The calculated ATE standard error - """ - N_c, N_t = len(vars_c), len(vars_t) - N = N_c + N_t - weights_c = (N_c / N) * (1 + scaled_counts_c) - weights_t = (N_t / N) * (1 + scaled_counts_t) - - var = calc_atx_var(vars_c, vars_t, weights_c, weights_t) - - return np.sqrt(var) - - -def pval_calc(z): - """Calculates p-value of the normal cumulative distribution function based on z. - - Args: - z: - The z-score for which the p-value is calculated - - Returns: - The calculated p-value rounded to 2 decimal places - - """ - return round(2 * (1 - norm.cdf(abs(z))), 2) - - -def scaled_counts(N: int, matches: np.ndarray, silent: bool = True) -> np.ndarray: - """Counts the number of times each subject has appeared as a match. - - In the case of multiple matches, each subject only gets partial credit. - - Args: - N: - The length of original treated or control group - matches: - A numpy array of matched indexes from control or treated group - silent: - If true logger in info mode - - Returns: - An array representing the number of times each subject has appeared as a match - """ - s_counts = np.zeros(N) - - for matches_i in matches: - scale = 1 / len(matches_i) - for match in matches_i: - s_counts[match] += scale - - if silent: - logger.debug(f"Calculated the number of times each subject has appeared as a match: {len(s_counts)}") - else: - logger.info(f"Calculated the number of times each subject has appeared as a match: {len(s_counts)}") - - return s_counts - - -def bias_coefs(matches, Y_m, X_m): - """Computes Ordinary Least Squares (OLS) coefficient in bias correction regression. - - Constructs data for regression by including (possibly multiple times) every - observation that has appeared in the matched sample. - - Args: - matches: - A numpy array of matched indexes - Y_m: - The dependent variable values - X_m: - The independent variable values - - Returns: - The calculated OLS coefficients excluding the intercept - """ - flat_idx = np.concatenate(matches) - N, K = len(flat_idx), X_m.shape[1] - - Y = Y_m[flat_idx] - X = np.empty((N, K + 1)) - X[:, 0] = 1 # intercept term - X[:, 1:] = X_m[flat_idx] - - return np.linalg.lstsq(X, Y)[0][1:] # don't need intercept coef - - -def bias(X, X_m, coefs): - """Computes bias correction term. - - It is approximated by the dot product of the - matching discrepancy (i.e., X-X_matched) and the - coefficients from the bias correction regression. - - Args: - X: - The original independent variable values - X_m: - The matched independent variable values - coefs: - The coefficients from the bias correction regression - - Returns: - The calculated bias correction terms for each observation - """ - bias_list = [(X_j - X_i).dot(coefs) for X_i, X_j in zip(X, X_m)] - - return np.array(bias_list) - - -def f3(index: np.array, dist: np.array, k: int) -> list: - """Function returns list of n matches with equal distance in case n>1. - - Args: - index: - Array of matched indexes - dist: - Array of matched distances - k: - k of neareast neighbors with same distance - - Returns: - Array of indexes for k neighbors with same distance - """ - new = [] - for i, val in enumerate(index): - eq_dist = sorted(set(dist[i])) - unit = [] - for d in eq_dist[:k]: - unit.append(val[np.where(dist[i] == d)[0]]) - new.append(np.concatenate(unit)) - return new diff --git a/hypex/algorithms/no_replacement_matching.py b/hypex/algorithms/no_replacement_matching.py deleted file mode 100644 index 30e63e2b..00000000 --- a/hypex/algorithms/no_replacement_matching.py +++ /dev/null @@ -1,162 +0,0 @@ -"""Here is the realization of Matching without replacement.""" -from typing import Tuple - -import numpy as np -import pandas as pd -from scipy.optimize import linear_sum_assignment -from scipy.spatial import distance - -from .faiss_matcher import conditional_covariance - - -class MatcherNoReplacement: - """Matching groups with no replacement. - - Realized by optimizing the linear sum of distances between pairs of treatment and - control samples. - """ - - def __init__(self, X: pd.DataFrame, a: pd.Series, weights: dict = None): - """Initialize matching. - - Args: - X: features dataframe - a: series of treatment value - weights: weights for numeric columns in order to increase matching quality. - """ - self.treatment = a - self.X = X - self.weights = weights - - def match(self): - """Function run matching with no replacement. - - Returns: - Dataframe of matched indexes. - """ - matches = {} - cov = conditional_covariance(self.X[self.treatment == 1].values, self.X[self.treatment == 0].values) - distance_matrix = self._get_distance_matrix(self.X[self.treatment == 1], self.X[self.treatment == 0], cov) - source_array, neighbor_array_indices, distances = optimally_match_distance_matrix(distance_matrix) - source_df = self.X[self.treatment == 1].iloc[np.array(source_array)] - target_df = self.X[self.treatment == 0].iloc[np.array(neighbor_array_indices)] - - matches[1] = self.create_match_df(self.treatment, source_df, target_df, distances) - matches[0] = self.create_match_df(self.treatment, target_df, source_df, distances) - - match_df = pd.concat(matches, sort=True) - return match_df - - def create_match_df( - self, base_series: pd.Series, source_df: pd.DataFrame, target_df: pd.DataFrame, distances: list - ) -> pd.DataFrame: - """Function creates matching dataframe. - - Args: - base_series: series of treatment value. - source_df: dataframe of sources indexes. - target_df: dataframe of target indexes. - distances: matrix of calculated distances. - - Returns: - Matched dataframe of indexes. - """ - match_sub_df = pd.DataFrame( - index=base_series.index, - columns=[ - "matches", - "distances", - ], - data=base_series.apply(lambda x: pd.Series([[], []])).values, - dtype="object", - ) - - # matching from source to target: read distances - match_sub_df.loc[source_df.index] = pd.DataFrame( - data=dict( - matches=[[tidx] for tidx in target_df.index], - distances=distances, - ), - index=source_df.index, - ) - - # matching from target to target: fill with zeros - match_sub_df.loc[target_df.index] = pd.DataFrame( - data=dict( - matches=[[tidx] for tidx in target_df.index], - distances=[[0]] * len(distances), - ), - index=target_df.index, - ) - return match_sub_df - - def _get_metric_dict(self, cov: np.ndarray) -> dict: - """Function calculates correct feature space and generate metrics dist for cdist calculation. - - Args: - cov: Matrix of covariations. - - Returns: - Metric dictionary - """ - metric_dict = dict(metric="mahalanobis") - mahalanobis_transform = np.linalg.inv(cov) - if self.weights is not None: - features = self.X.columns - w_list = np.array([self.weights[col] if col in self.weights.keys() else 1 for col in features]) - w_matrix = np.sqrt(np.diag(w_list / w_list.sum())) - mahalanobis_transform = np.dot(w_matrix, mahalanobis_transform) - - metric_dict["VI"] = mahalanobis_transform - return metric_dict - - def _get_distance_matrix(self, source_df: pd.DataFrame, target_df: pd.DataFrame, cov: np.ndarray) -> np.ndarray: - """Create distance matrix for no replacement match. - - Combines metric and source/target data into a - precalculated distance matrix which can be passed to - scipy.optimize.linear_sum_assignment. - - Args: - source_df: source feature dataframe. - target_df: target feature dataframe. - cov: matrix of covariations. - - Returns: - Matrix of distances. - """ - cdist_args = dict(XA=_ensure_array_columnlike(source_df.values), XB=_ensure_array_columnlike(target_df.values)) - cdist_args.update(self._get_metric_dict(cov)) - distance_matrix = distance.cdist(**cdist_args) - - return distance_matrix - - -def optimally_match_distance_matrix(distance_matrix: np.ndarray) -> Tuple[np.ndarray, np.ndarray, list]: - """Functions finds optimal neighbor with no replacement. - - Args: - distance_matrix: matrix of distances. - - Returns: - - indexes of source dataframe. - - optimal neighbors array for source array. - - distances of optimal neighbors. - """ - source_array, neighbor_array_indices = linear_sum_assignment(distance_matrix) - distances = [[distance_matrix[s_idx, t_idx]] for s_idx, t_idx in zip(source_array, neighbor_array_indices)] - return source_array, neighbor_array_indices, distances - - -def _ensure_array_columnlike(target_array: np.ndarray) -> np.ndarray: - """Function checks if array is column like and reshape it in order it is not. - - Args: - target_array: checked array. - - Returns: - column like target array. - """ - if len(target_array.shape) < 2 or target_array.shape[1] == 1: - target_array = target_array.reshape(-1, 1) - return target_array diff --git a/hypex/matcher.py b/hypex/matcher.py deleted file mode 100644 index a19b4333..00000000 --- a/hypex/matcher.py +++ /dev/null @@ -1,580 +0,0 @@ -"""Base Matcher class.""" -import logging -import pickle - -import numpy as np -import pandas as pd -from typing import Union -from tqdm.auto import tqdm - -from .algorithms.faiss_matcher import FaissMatcher -from .algorithms.no_replacement_matching import MatcherNoReplacement -from .selectors.feature_selector import FeatureSelector -from .selectors.spearman_filter import SpearmanFilter -from .selectors.outliers_filter import OutliersFilter -from .selectors.base_filtration import const_filtration, nan_filtration -from .utils.validators import random_feature -from .utils.validators import random_treatment -from .utils.validators import subset_refuter -from .utils.validators import test_significance - - -REPORT_FEAT_SELECT_DIR = "report_feature_selector" -REPORT_PROP_MATCHER_DIR = "report_matcher" -NAME_REPORT = "lama_interactive_report.html" -N_THREADS = 1 -N_FOLDS = 4 -RANDOM_STATE = 123 -TEST_SIZE = 0.2 -TIMEOUT = 600 -VERBOSE = 2 -USE_ALGOS = ["lgb"] -PROP_SCORES_COLUMN = "prop_scores" -GENERATE_REPORT = True -SAME_TARGET_THRESHOLD = 0.7 -OUT_INTER_COEFF = 1.5 -OUT_MODE_PERCENT = True -OUT_MIN_PERCENT = 0.02 -OUT_MAX_PERCENT = 0.98 - -logger = logging.getLogger("hypex") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -class Matcher: - """Class for compile full pipeline of Matching in Causal Inference task. - - Matcher steps: - - Read, analyze data - - Feature selection via LightAutoML - - Converting a dataset with features to another space via Cholesky decomposition - In the new space, the distance L2 becomes equivalent to the Mahalanobis distance. - This allows us to use faiss to search for nearest objects, which can search only by L2 metric, - but without violating the methodology of matching, - for which it is important to count by the Mahalanobis distance - - Finding the nearest neighbors for each unit (with duplicates) using faiss. - For each of the control group, neighbors from the target group are matched and vice versa. - - Calculation bias - - Creating matched df (Wide df with pairs) - - Calculation metrics: ATE, ATT, ATC, p-value, and сonfidence intervals - - Calculation quality: PS-test, KS test, SMD test - - Returns metrics as dataframe, quality results as dict of df's and df_matched - - After receiving the result, the result should be validated using :func:`~hypex.matcher.Matcher.validate_result` - - Example: - Common usecase - base pipeline for matching - - >>> # Base info - >>> treatment = "treatment" # Column name with info about 'treatment' 0 or 1 - >>> target = "target" # Column name with target - >>> - >>> # Optional - >>> info_col = ["user_id", 'address'] # Columns that will not participate in the match and are informative. - >>> group_col = "CatCol" # Column name for strict comparison (for a categorical feature) - >>> - >>> # Matching - >>> model = Matcher(data, outcome=target, treatment=treatment, info_col=info_col, group_col=group_col) - >>> features = model.lama_feature_select() # Feature selection via lama - >>> results, quality, df_matched = model.estimate(features=some_features) # Performs matching - >>> - >>> model.validate_result() - """ - - def __init__( - self, - input_data: pd.DataFrame, - treatment: str, - outcome: Union[str, list] = None, - outcome_type: str = "numeric", - group_col: str = None, - info_col: list = None, - weights: dict = None, - base_filtration: bool = False, - generate_report: bool = GENERATE_REPORT, - report_feat_select_dir: str = REPORT_FEAT_SELECT_DIR, - timeout: int = TIMEOUT, - n_threads: int = N_THREADS, - n_folds: int = N_FOLDS, - verbose: bool = VERBOSE, - use_algos: list = None, - same_target_threshold: float = SAME_TARGET_THRESHOLD, - interquartile_coeff: float = OUT_INTER_COEFF, - drop_outliers_by_percentile: bool = OUT_MODE_PERCENT, - min_percentile: float = OUT_MIN_PERCENT, - max_percentile: float = OUT_MAX_PERCENT, - n_neighbors: int = 1, - silent: bool = True, - pbar: bool = True, - ): - """Initialize the Matcher object. - - Args: - input_data: - Input dataframe - outcome: - Target column - treatment: - Column determine control and test groups - outcome_type: - Values type of target column. Defaults to "numeric" - group_col: - Column for grouping. Defaults to None. - info_col: - Columns with id, date or metadata, not taking part in calculations. Defaults to None - weights: - weights for numeric columns in order to increase matching quality by weighted feature. - By default, is None (all features have the same weight equal to 1). Example: {'feature_1': 10} - base_filtration: - To use or not base filtration of features in order to remove all constant or almost all constant, bool. - Default is False. - generate_report: - Flag to create report. Defaults to True - report_feat_select_dir: - Folder for report files. Defaults to "report_feature_selector" - timeout: - Limit work time of code LAMA. Defaults to 600 - n_threads: - Maximum number of threads. Defaults to 1 - n_folds: - Number of folds for cross-validation. Defaults to 4 - verbose: - Flag to show process stages. Defaults to 2 - use_algos: - List of names of LAMA algorithms for feature selection. Defaults to ["lgb"] - same_target_threshold: - Threshold for correlation coefficient filter (Spearman). Default to 0.7 - interquartile_coeff: - Percent for drop outliers. Default to 1.5 - drop_outliers_by_percentile: - Flag to drop outliers by custom percentiles. Defaults to True - min_percentile: - Minimum percentile to drop outliers. Defaults to 0.02 - max_percentile: - Maximum percentile to drop outliers. Defaults to 0.98 - n_neighbors: - Number of neighbors to match (in fact you may see more then n matches as every match may have more then - one neighbor with the same distance). Default value is 1. - silent: - Write logs in debug mode - pbar: - Display progress bar while get index - """ - if use_algos is None: - use_algos = USE_ALGOS - self.input_data = input_data - if outcome is None: - outcome = list() - self.outcomes = outcome if type(outcome) == list else [outcome] - self.treatment = treatment - self.group_col = group_col - self.info_col = info_col - self.outcome_type = outcome_type - self.weights = weights - self.generate_report = generate_report - self.report_feat_select_dir = report_feat_select_dir - self.timeout = timeout - self.n_threads = n_threads - self.n_folds = n_folds - self.verbose = verbose - self.use_algos = use_algos - self.same_target_threshold = same_target_threshold - self.interquartile_coeff = interquartile_coeff - self.mode_percentile = drop_outliers_by_percentile - self.min_percentile = min_percentile - self.max_percentile = max_percentile - self.base_filtration = base_filtration - self.features_importance = None - self.matcher = None - self.val_dict = None - self.pval_dict = None - self.new_treatment = None - self.validate = None - self.dropped_features = [] - self.n_neighbors = n_neighbors - self.silent = silent - self.pbar = pbar - self._preprocessing_data() - - def _convert_categorical_to_dummy(self): - """Converts categorical variables to dummy variables. - - Returns: - Data with categorical variables converted to dummy variables. - """ - info_col = self.info_col if self.info_col is not None else [] - group_col = [self.group_col] if self.group_col is not None else [] - - columns_to_drop = info_col + group_col - if columns_to_drop is not None: - data = self.input_data.drop(columns=columns_to_drop) - else: - data = self.input_data - dummy_data = pd.get_dummies(data, drop_first=True, dtype=np.uint8) - return dummy_data - - def _preprocessing_data(self): - """Converts categorical features into dummy variables.""" - info_col = self.info_col if self.info_col is not None else [] - group_col = [self.group_col] if self.group_col is not None else [] - columns_to_drop = info_col + group_col + self.outcomes + [self.treatment] - if self.base_filtration: - filtered_features = nan_filtration(self.input_data.drop(columns=columns_to_drop)) - self.dropped_features = [f for f in self.input_data.columns if f not in filtered_features + columns_to_drop] - self.input_data = self.input_data[filtered_features + columns_to_drop] - nan_counts = self.input_data.isna().sum().sum() - if nan_counts != 0: - self._log(f"Number of NaN values filled with zeros: {nan_counts}", silent=False) - self.input_data = self.input_data.fillna(0) - - if self.group_col is not None: - group_col = self.input_data[[self.group_col]] - if self.info_col is not None: - info_col = self.input_data[self.info_col] - - self.input_data = self._convert_categorical_to_dummy() - if self.group_col is not None: - self.input_data = pd.concat([self.input_data, group_col], axis=1) - - if self.info_col is not None: - self.input_data = pd.concat([self.input_data, info_col], axis=1) - - if self.base_filtration: - filtered_features = const_filtration(self.input_data.drop(columns=columns_to_drop)) - self.dropped_features = np.concatenate( - ( - self.dropped_features, - [f for f in self.input_data.columns if f not in filtered_features + columns_to_drop], - ) - ) - self.input_data = self.input_data[filtered_features + columns_to_drop] - - self._log("Categorical features turned into dummy") - - def _apply_filter(self, filter_class, *filter_args): - """Applies a filter to the input data. - - Args: - filter_class: - The class of the filter to apply. - *filter_args: - Arguments to pass to the filter class. - """ - filter_instance = filter_class(*filter_args) - self.input_data = filter_instance.perform_filter(self.input_data) - - def _spearman_filter(self): - """Applies a filter by dropping columns correlated with the outcome column. - - This method uses the Spearman filter to eliminate features from the dataset - that are highly correlated with the outcome columns, based on a pre-set threshold - """ - self._log("Applying filter by spearman test - drop columns correlated with outcome") - self._apply_filter(SpearmanFilter, self.outcomes[0], self.treatment, self.same_target_threshold) - - def _outliers_filter(self): - """Removes outlier values from the dataset. - - This method employs an OutliersFilter. If `drop_outliers_by_percentile` is True, - it retains only the values between the min and max percentiles - If `drop_outliers_by_percentile` is False, it retains only the values between 2nd and 98th percentiles - """ - self._log( - f"Applying filter of outliers\n" - f"interquartile_coeff={self.interquartile_coeff}\n" - f"mode_percentile={self.mode_percentile}\n" - f"min_percentile={self.min_percentile}\n" - f"max_percentile={self.max_percentile}" - ) - - self._apply_filter( - OutliersFilter, self.interquartile_coeff, self.mode_percentile, self.min_percentile, self.max_percentile - ) - - def match_no_rep(self, threshold: float = 0.1) -> pd.DataFrame: - """Matching groups with no replacement. - - It's done by optimizing the linear sum of - distances between pairs of treatment and control samples. - - Args: - threshold: caliper for minimum deviation between test and control groups. in case weights is not None. - - Returns: - Matched dataframe with no replacements. - """ - a = self.input_data[self.treatment] - X = self.input_data.drop(columns=self.treatment) - if self.info_col is not None: - X = X.drop(columns=self.info_col) - - index_matched = MatcherNoReplacement(X, a, self.weights).match() - filtred_matches = index_matched.loc[1].iloc[self.input_data[a == 1].index].matches[index_matched.loc[1].iloc[self.input_data[a == 1].index].matches.apply(lambda x: x != [])] - - if self.weights is not None: - weighted_features = [f for f in self.weights.keys()] - index_dict = dict() - for w in weighted_features: - source = self.input_data.loc[np.concatenate(filtred_matches.values)][w].values - target = self.input_data.loc[filtred_matches.index.to_list()][w].values - index = abs(source - target) <= abs(source) * threshold - index_dict.update({w: index}) - index_filtered = sum(index_dict.values()) == len(self.weights) - matched_data = pd.concat( - [self.input_data.loc[filtred_matches.index.to_list()].iloc[index_filtered], - self.input_data.loc[np.concatenate(filtred_matches.values)].iloc[index_filtered]] - ) - else: - matched_data = pd.concat([self.input_data.loc[filtred_matches.index.to_list()], - self.input_data.loc[np.concatenate(filtred_matches.values)]]) - return matched_data - - def lama_feature_select(self) -> pd.DataFrame: - """Calculates the importance of each feature. - - This method use LamaFeatureSelector to rank the importance of each feature in the dataset - The features are then sorted by their importance with the most important feature first - - Returns: - The feature importances, sorted in descending order - """ - self._log("Counting feature importance") - - feat_select = FeatureSelector( - outcome=self.outcomes[0], - outcome_type=self.outcome_type, - treatment=self.treatment, - timeout=self.timeout, - n_threads=self.n_threads, - n_folds=self.n_folds, - verbose=self.verbose, - generate_report=self.generate_report, - report_dir=self.report_feat_select_dir, - use_algos=self.use_algos, - ) - df = self.input_data if self.group_col is None else self.input_data.drop(columns=self.group_col) - - if self.info_col is not None: - df = df.drop(columns=self.info_col) - - features = feat_select.perform_selection(df=df) - if self.group_col is None: - self.features_importance = features - else: - self.features_importance = features.append( - {"Feature": self.group_col, "Importance": features.Importance.max()}, ignore_index=True - ) - return self.features_importance.sort_values("Importance", ascending=False) - - def _create_faiss_matcher(self, df=None, validation=None): - """Creates a FaissMatcher object. - - Args: - df: - The dataframe to use. If None, uses self.input_data. - validation: - Whether to use the matcher for validation. If None, determines based on whether - """ - if df is None: - df = self.input_data - self.matcher = FaissMatcher( - df, - self.outcomes, - self.treatment, - info_col=self.info_col, - weights=self.weights, - features=self.features_importance, - group_col=self.group_col, - validation=validation, - n_neighbors=self.n_neighbors, - pbar=False if validation else self.pbar, - ) - - def _perform_validation(self): - """Performs validation using the FaissMatcher.""" - if self.group_col is None: - sim = self.matcher.match() - else: - sim = self.matcher.group_match() - for key in self.val_dict.keys(): - self.val_dict[key].append(sim[key][0]) - - def _log(self, message, silent=None): - """Logs a message at the appropriate level. - - Args: - message: - The message to log. - silent: - If silent, logs will be only info - """ - if silent is None: - silent = self.silent - if silent: - logger.debug(message) - else: - logger.info(message) - - def _matching(self) -> tuple: - """Performs matching considering the presence of groups. - - Returns: - Results of matching and matching quality metrics - """ - self._create_faiss_matcher() - self._log("Applying matching") - - self.results, df_matched = self.matcher.match() - - self.quality_result = self.matcher.matching_quality(df_matched) - - return self.results, self.quality_result, df_matched - - def validate_result( - self, refuter: str = "random_feature", effect_type: str = "ate", n_sim: int = 10, fraction: float = 0.8 - ) -> dict: - """Validates estimated ATE (Average Treatment Effect). - - Validates estimated effect: - 1) by replacing real treatment with random placebo treatment. - Estimated effect must be droped to zero, p-val > 0.05; - 2) by adding random feature (`random_feature`). Estimated effect shouldn't change - significantly, p-val < 0.05; - 3) estimates effect on subset of data (default fraction is 0.8). Estimated effect - shouldn't change significantly, p-val < 0.05. - - Args: - refuter: - Refuter type (`random_treatment`, `random_feature`, `subset_refuter`) - effect_type: - Which effect to validate (`ate`, `att`, `atc`) - n_sim: - Number of simulations - fraction: - Subset fraction for subset refuter only - - Returns: - Dictionary of outcome_name (mean_effect on validation, p-value) - """ - if self.silent: - logger.debug("Applying validation of result") - else: - logger.info("Applying validation of result") - - self.val_dict = {k: [] for k in self.outcomes} - self.pval_dict = dict() - - effect_dict = {"ate": 0, "atc": 1, "att": 2} - - assert effect_type in effect_dict.keys() - - for i in tqdm(range(n_sim)): - if refuter in ["random_treatment", "random_feature"]: - if refuter == "random_treatment": - self.input_data, orig_treatment, self.validate = random_treatment(self.input_data, self.treatment) - elif refuter == "random_feature": - self.input_data, self.validate = random_feature(self.input_data) - if self.features_importance is not None and i == 0: - self.features_importance.append("random_feature") - - self.matcher = FaissMatcher( - self.input_data, - self.outcomes, - self.treatment, - info_col=self.info_col, - features=self.features_importance, - group_col=self.group_col, - validation=self.validate, - n_neighbors=self.n_neighbors, - pbar=False, - ) - elif refuter == "subset_refuter": - df, self.validate = subset_refuter(self.input_data, self.treatment, fraction) - self.matcher = FaissMatcher( - df, - self.outcomes, - self.treatment, - info_col=self.info_col, - features=self.features_importance, - group_col=self.group_col, - validation=self.validate, - n_neighbors=self.n_neighbors, - pbar=False, - ) - else: - logger.error("Incorrect refuter name") - raise NameError( - "Incorrect refuter name! Available refuters: `random_feature`, `random_treatment`, `subset_refuter`" - ) - - if self.group_col is None: - sim = self.matcher.match() - else: - sim = self.matcher.group_match() - - for key in self.val_dict.keys(): - self.val_dict[key].append(sim[key][0]) - - for outcome in self.outcomes: - self.pval_dict.update({outcome: [np.mean(self.val_dict[outcome])]}) - self.pval_dict[outcome].append( - test_significance( - self.results.query("outcome==@outcome").loc[effect_type.upper()]["effect_size"], - self.val_dict[outcome], - ) - ) - if refuter == "random_treatment": - self.input_data[self.treatment] = orig_treatment - elif refuter == "random_feature": - self.input_data = self.input_data.drop(columns="random_feature") - if self.features_importance is not None: - self.features_importance.remove("random_feature") - - return self.pval_dict - - def estimate(self, features: list = None) -> tuple: - """Performs matching via Mahalanobis distance. - - Args: - features: - List or feature_importances from LAMA of features for matching - - Returns: - Results of matching and matching quality metrics - """ - if features is not None: - self.features_importance = features - return self._matching() - - def save(self, filename): - """Save the object to a file using pickle. - - This method serializes the object and writes it to a file - - Args: - filename: - The name of the file to write to. - """ - with open(filename, "wb") as f: - pickle.dump(self, f) - - @classmethod - def load(cls, filename): - """Load an object from a file. - - This method reads a file and deserializes the object from it - - Args: - filename: - The name of the file to read from. - - Returns: - The deserialized object - """ - with open(filename, "rb") as f: - return pickle.load(f) diff --git a/hypex/selectors/__init__.py b/hypex/selectors/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/hypex/selectors/base_filtration.py b/hypex/selectors/base_filtration.py deleted file mode 100644 index ef8e55fd..00000000 --- a/hypex/selectors/base_filtration.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Function for base filtration of data.""" -import pandas as pd -import numpy as np - - -def const_filtration(X: pd.DataFrame, threshold: float = 0.95) -> list: - """Function removes features consist of constant value on 95%. - - Args: - X: related dataset - threshold: constant fill rate, default is 0.95 - - Returns: - List of filtered columns - """ - is_const = pd.Series(0, index=X.columns, dtype=np.dtype(bool)) - for col in X.columns: - # NaNs are not counted using unique (since np.nan != np.nan). Fill them with a unique value: - cur_col = X.loc[:, col] - cur_col.loc[~np.isfinite(cur_col)] = cur_col.max() + 1 - # Get values' frequency: - freqs = cur_col.value_counts(normalize=True) - is_const[col] = np.any(freqs > threshold) - - selected_features = ~is_const - if np.sum(selected_features) == 0: - raise AssertionError("All features were removed by constant filtration.") - else: - return X.loc[:, selected_features].columns.to_list() - - -def nan_filtration(X: pd.DataFrame, threshold: float = 0.8): - """Function removes features consist of NaN value on 80%. - - Args: - X: related dataset - threshold: constant fill rate, default is 0.95 - - Returns: - List of filtered columns - """ - nan_freqs = np.mean(pd.isnull(X), axis=0) - is_sparse = nan_freqs > threshold - selected_features = ~is_sparse - if np.sum(selected_features) == 0: - raise AssertionError("All features were removed by nan filtration.") - else: - return X.loc[:, selected_features].columns.to_list() diff --git a/hypex/selectors/feature_selector.py b/hypex/selectors/feature_selector.py deleted file mode 100644 index 036018c7..00000000 --- a/hypex/selectors/feature_selector.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Feature selection class using LAMA.""" -import logging -from typing import List - -import pandas as pd - -logger = logging.getLogger("lama_feature_selector") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -class FeatureSelector: - """Class of LAMA Feature selector. Select top features. By default, use LGM. - # TODO: write some feature selector""" - - def __init__( - self, - outcome: str, - outcome_type: str, - treatment: str, - timeout: int, - n_threads: int, - n_folds: int, - verbose: bool, # не используется - generate_report: bool, - report_dir: str, - use_algos: List[str], - ): - """Initialize the LamaFeatureSelector. - - Args: - outcome: - The target column - outcome_type: - The type of target column - treatment: - The column that determines control and test groups - timeout: - Time limit for the execution of the code - n_threads: - Maximum number of threads to be used - n_folds: - Number of folds for cross-validation - verbose: - Flag to control the verbosity of the process stages - generate_report: - Flag to control whether to create a report or not - report_dir: - Directory for storing report files - use_algos: - List of names of LAMA algorithms for feature selection - """ - self.outcome = outcome - self.outcome_type = outcome_type - self.treatment = treatment - self.use_algos = use_algos - self.timeout = timeout - self.n_threads = n_threads - self.n_folds = n_folds - self.verbose = verbose - self.generate_report = generate_report - self.report_dir = report_dir - - def perform_selection(self, df: pd.DataFrame) -> pd.DataFrame: - """Trains a model and returns feature scores. - - This method defines metrics, applies the model, creates a report, and returns feature scores - - Args: - df: - Input data - - Returns: - A DataFrame containing the feature scores from the model - - """ - roles = { - "target": self.outcome, - "drop": [self.treatment], - } - - if self.outcome_type == "numeric": - task_name = "reg" - loss = "mse" - metric = "mse" - elif self.outcome_type == "binary": - task_name = "binary" - loss = "logloss" - metric = "logloss" - else: - task_name = "multiclass" - loss = "crossentropy" - metric = "crossentropy" - - features_scores = [] - - return features_scores diff --git a/hypex/selectors/outliers_filter.py b/hypex/selectors/outliers_filter.py deleted file mode 100644 index ff55cc94..00000000 --- a/hypex/selectors/outliers_filter.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Outliers filter.""" -import logging - -import pandas as pd - - -logger = logging.getLogger("outliers_filter") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -class OutliersFilter: - """Class of Outliers Filter. It creates a row indices that should be deleted by percentile.""" - - def __init__(self, interquartile_coeff, mode_percentile, min_percentile, max_percentile): - """Initializes the OutliersFilter. - - Args: - interquartile_coeff: - Coefficient for the interquartile range to determine outliers - mode_percentile: - If True, outliers are determined by custom percentiles - min_percentile: - The lower percentile. Values below this percentile are considered outliers. - max_percentile: - The upper percentile. Values above this percentile are considered outliers - """ - self.interquartile_coeff = interquartile_coeff - self.mode_percentile = mode_percentile - self.min_percentile = min_percentile - self.max_percentile = max_percentile - - def perform_filter(self, df: pd.DataFrame, interquartile: bool = True) -> set: - """Identifies rows with outliers. - - This method creates a set of row indices to be removed, which contains values less than - `min_percentile` and larger than `max_percentile` (if `mode_percentile` is True), or values - smaller than the 0.2 and larget than 0.8 (if `mode_percentile` is False) - - Args: - df: - The input DataFrame - interquartile: - If True, uses the interquartile range to determine outliers. Defaults to True - - Returns: - The set of row indices with outliers - """ - columns_names = df.select_dtypes(include="number").columns - rows_for_del = [] - for column in columns_names: - if self.mode_percentile: - min_value = df[column].quantile(self.min_percentile) - max_value = df[column].quantile(self.max_percentile) - elif interquartile: - upper_quantile = df[column].quantile(0.8) - lower_quantile = df[column].quantile(0.2) - - interquartile_range = upper_quantile - lower_quantile - min_value = lower_quantile - self.interquartile_coeff * interquartile_range - max_value = upper_quantile + self.interquartile_coeff * interquartile_range - else: - mean_value = df[column].mean() - standard_deviation = df[column].std() - nstd_lower, nstd_upper = 3, 3 - - min_value = mean_value - nstd_lower * standard_deviation - max_value = mean_value + nstd_upper * standard_deviation - - rows_for_del_column = (df[column] < min_value) | (df[column] > max_value) - rows_for_del_column = df.index[rows_for_del_column].tolist() - rows_for_del.extend(rows_for_del_column) - rows_for_del = set(rows_for_del) - logger.info(f"Drop {len(rows_for_del)} rows") - - return rows_for_del diff --git a/hypex/selectors/spearman_filter.py b/hypex/selectors/spearman_filter.py deleted file mode 100644 index cbb47bce..00000000 --- a/hypex/selectors/spearman_filter.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Spearman filter.""" -import logging - -import pandas as pd - -from scipy.stats import spearmanr - - -PVALUE = 0.05 - -logger = logging.getLogger("spearman_filter") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -class SpearmanFilter: - """Class to filter columns based on the Spearman correlation coefficient. - - The class is utilized to filter dataframe columns that do not exhibit a significant - correlation (based on a provided threshold) with a specified outcome column. - The significance of the correlation is determined using the Spearman correlation coefficient - and a p-value threshold of 0.05 - """ - - def __init__(self, outcome: str, treatment: str, threshold: float): - """Initialize spearman filter. - - Args: - outcome: - The name of target column - treatment: - The name of the column that determines control and test groups - threshold: - The threshold for the Spearman correlation coefficient filter - """ - self.outcome: str = outcome - self.treatment: str = treatment - self.threshold: float = threshold - - def perform_filter(self, df: pd.DataFrame) -> pd.DataFrame: - """Filters columns based on their correlation with the outcome column. - - The method tests the correlation using the Spearman correlation coefficient. - Columns that have an absolute correlation coefficient value less than the provided threshold, - and a p-value less than 0.05, are considered insignificant and are removed from the dataframe - - Args: - df: - The input DataFrame - - Returns: - The filtered DataFrame, containing only columns that - are significantly correlated with the outcome column - """ - selected = [] - columns = df.drop([self.treatment, self.outcome], 1).columns - for column in columns: - result = spearmanr(df[self.outcome].values, df[column].values) - if (abs(result[0] < self.threshold)) and (result[1] < PVALUE): - selected.append(column) - - logger.info(f"Drop columns {list(set(columns) - set(selected))}") - - columns = selected + [self.treatment, self.outcome] - df = df[columns] - - return df diff --git a/hypex/utils/__init__.py b/hypex/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/hypex/utils/metrics.py b/hypex/utils/metrics.py deleted file mode 100644 index 9958be3f..00000000 --- a/hypex/utils/metrics.py +++ /dev/null @@ -1,161 +0,0 @@ -"""Calculate metrics.""" -import logging - -import numpy as np -import pandas as pd - -from scipy.stats import ks_2samp - -from ..utils.psi_pandas import report - - -logger = logging.getLogger("metrics") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -def smd(orig: pd.DataFrame, matched: pd.DataFrame, silent=False) -> pd.DataFrame: - """Calculates the standardised mean difference to evaluate matching quality. - - Args: - orig: - Initial dataframe - matched: - Matched dataframe - silent: - If silent, logger in info mode - - Returns: - The standard mean deviation between initial and matched dataframes - """ - smd_data = abs(orig.mean(0) - matched.mean(0)) / orig.std(0) - - if silent: - logger.debug(f"Standardised mean difference:\n{smd_data}") - else: - logger.info(f"Standardised mean difference:\n{smd_data}") - - return smd_data - - -def ks(orig: pd.DataFrame, matched: pd.DataFrame, silent=False) -> dict: - """Performs a Kolmogorov-Smirnov test to evaluate matching quality per columns. - - Args: - orig: - Initial dataframe - matched: - Matched dataframe - silent: - If silent, logger in info mode - - - Returns: - dict of p-values - - """ - ks_dict = dict() - matched.columns = orig.columns - for col in orig.columns: - ks_pval_1 = ks_2samp(orig[col].values, matched[col].values)[1] - ks_dict.update({col: ks_pval_1}) - - filter_list = list(ks_dict.keys())[:3] + list(ks_dict.keys())[-3:] - dict_to_show = {key: val for key, val in ks_dict.items() if key in filter_list} - - if silent: - logger.debug(f"Kolmogorov-Smirnov test to check matching quality: \n{dict_to_show}") - else: - logger.info(f"Kolmogorov-Smirnov test to check matching quality: \n{dict_to_show}") - - return ks_dict - - -def matching_quality( - data: pd.DataFrame, treatment: str, features: list, features_psi: list, silent: bool = False -) -> tuple: - """Wraps the functionality for estimating matching quality. - - Args: - data: - The dataframe of matched data - treatment: - The column determining control and test groups - features: - The list of features, ks-test and smd accept only numeric values - features_psi: - The list of features for calculating Population Stability Index (PSI) - silent: - If silent, logger in info mode - - - Returns: - A tuple of dataframes with estimated metrics for matched treated to control and control to treated - - """ - orig_treated = data[data[treatment] == 1][features] - orig_untreated = data[data[treatment] == 0][features] - matched_treated = data[data[treatment] == 1][sorted([f + "_matched" for f in features])] - matched_treated.columns = list(map(lambda x: x.replace("_matched", ""), matched_treated.columns)) - matched_untreated = data[data[treatment] == 0][sorted([f + "_matched" for f in features])] - matched_untreated.columns = list(map(lambda x: x.replace("_matched", ""), matched_untreated.columns)) - - psi_treated = data[data[treatment] == 1][features_psi] - psi_treated_matched = data[data[treatment] == 1][[f + "_matched" for f in features_psi]] - psi_treated_matched.columns = [f + "_treated" for f in features_psi] - psi_treated.columns = [f + "_treated" for f in features_psi] - - psi_untreated = data[data[treatment] == 0][features_psi] - psi_untreated_matched = data[data[treatment] == 0][[f + "_matched" for f in features_psi]] - psi_untreated.columns = [f + "_untreated" for f in features_psi] - psi_untreated_matched.columns = [f + "_untreated" for f in features_psi] - - treated_smd_data = smd(orig_treated, matched_treated, silent) - untreated_smd_data = smd(orig_untreated, matched_untreated, silent) - smd_data = pd.concat([treated_smd_data, untreated_smd_data], axis=1) - smd_data.columns = ["match_control_to_treat", "match_treat_to_control"] - - treated_ks = ks(orig_treated, matched_treated, silent) - untreated_ks = ks(orig_untreated, matched_untreated, silent) - ks_dict = {k: [treated_ks[k], untreated_ks[k]] for k in treated_ks.keys()} - ks_df = pd.DataFrame(data=ks_dict, index=range(2)).T - ks_df.columns = ["match_control_to_treat", "match_treat_to_control"] - - report_cols = ["column", "anomaly_score", "check_result"] - report_psi_treated = report(psi_treated, psi_treated_matched, silent=silent)[report_cols] - report_psi_treated.columns = [col + "_treated" for col in report_cols] - report_psi_untreated = report(psi_untreated, psi_untreated_matched, silent=silent)[report_cols] - report_psi_untreated.columns = [col + "_untreated" for col in report_cols] - report_psi = pd.concat( - [report_psi_treated.reset_index(drop=True), report_psi_untreated.reset_index(drop=True)], axis=1 - ) - - return report_psi, ks_df, smd_data - - -def check_repeats(index: np.array, silent: bool = False) -> float: - """Checks the fraction of duplicated indexes in the given array. - - Args: - index: - The array of indexes to check for duplicates - silent: - If silent, logger in info mode - - Returns: - The fraction of duplicated index - """ - unique, counts = np.unique(index, return_counts=True) - rep_frac = len(unique) / len(index) if len(unique) > 0 else 0 - - if silent: - logger.debug(f"Fraction of duplicated indexes: {rep_frac: .2f}") - else: - logger.info(f"Fraction of duplicated indexes: {rep_frac: .2f}") - - return round(rep_frac, 2) diff --git a/hypex/utils/psi_pandas.py b/hypex/utils/psi_pandas.py deleted file mode 100644 index d9fd1375..00000000 --- a/hypex/utils/psi_pandas.py +++ /dev/null @@ -1,497 +0,0 @@ -"""Calculate PSI.""" -import logging - -import matplotlib.pyplot as plt -import numpy as np -import pandas as pd - - -logger = logging.getLogger("psi_pandas") -console_out = logging.StreamHandler() -logging.basicConfig( - handlers=(console_out,), - format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", - datefmt="%d.%m.%Y %H:%M:%S", - level=logging.INFO, -) - - -class PSI: - """Calculates population stability index for different categories of data. - - For numeric data the class generates numeric buckets, except when numeric column - includes only NULL. For categorical data: - 1. For n < 20, a bucket equals the proportion of each category, - 2. For n > 20, a bucket equals to a group of categories, - 3. For n > 100, it calculates unique_index based on Jaccard similarity, - but in case of imbalance null-good data returns PSI - - Args: - expected: - The expected values - actual: - The actual values - column_name: - The column name for which to calculate the PSI - plot: - If true, generates a distribution plot. Defaults to False - - Returns: - PSI for column - The PSI for each bucket - New categories (empty list for non-categorical data) - Categories that are absent in actual column (empty list for non-categorical data) - """ - - def __init__( - self, expected: pd.DataFrame, actual: pd.DataFrame, column_name: str, plot: bool = False, silent=False - ): - """Initializes the PSI class with given parameters. - - Args: - expected: - The expected values - actual: - The actual values - column_name: - The column name for which to calculate the PSI - plot: - If true, generates a distribution plot. Defaults to False - silent: - If True show logs. Default by False - """ - self.expected = expected[column_name].values - self.actual = actual[column_name].values - self.expected_len = len(self.expected) - self.actual_len = len(self.actual) - self.column_name = column_name - self.column_type = self.expected.dtype - self.expected_shape = self.expected.shape - self.expected_nulls = np.sum(pd.isna(self.expected)) - self.actual_nulls = np.sum(pd.isna(self.actual)) - self.axis = 1 - self.plot = plot - self.silent = silent - if self.column_type == np.dtype("O"): - self.expected_uniqs = expected[column_name].unique() - self.actual_uniqs = actual[column_name].unique() - - def jac(self) -> float: - """Calculates the Jacquard similarity index. - - The Jacquard similarity index measures the intersection between two sequences - versus the union of the two sequences - - Returns: - The Jacquard similarity index - - """ - x = set(self.expected_uniqs) - y = set(self.expected_uniqs) - - logger.info(f"Jacquard similarity is {len(x.intersection(y)) / len(x.union(y)): .6f}") - - jac_sim_index = len(x.intersection(y)) / len(x.union(y)) - - return jac_sim_index - - # в функции нет аргумента nulls, а был и испольовался далее в коде - def plots(self, expected_percents, actual_percents, breakpoints, intervals): - """Generates plots expected and actual percents. - - Args: - expected_percents: - The percentage of expected value from all expected values - actual_percents: - The percentage of actual value from all actual values - breakpoints: - The list of breakpoints - intervals: - The list of intervals - - """ - points = [i for i in breakpoints] - plt.figure(figsize=(15, 7)) - plt.bar( - np.arange(len(intervals)) - 0.15, # что такое 0.15? Может вынести в константу? - expected_percents, - label="expected", - alpha=0.7, - width=0.3, - ) - plt.bar(np.arange(len(intervals)) + 0.15, actual_percents, label="actual", alpha=0.7, width=0.3) - plt.legend(loc="best") - - if self.column_type != np.dtype("O"): - plt.xticks(range(len(intervals)), intervals, rotation=90) - else: - plt.xticks(range(len(points)), points, rotation=90) - plt.title(self.column_name) - - # plt.savefig(f"C:\\Users\\Glazova2-YA\\Documents\\data\\bip\\summary_psi_plots\\{self.column_name}.png") - plt.show() - - def sub_psi(self, e_perc: float, a_perc: float) -> float: - """Calculates the sub PSI value. - - Args: - e_perc: - The expected percentage - a_perc: - The actual percentage - - Returns: - The calculated sub PSI value. - """ - if a_perc == 0: - a_perc = 0.0001 - if e_perc == 0: - e_perc = 0.0001 - - sub_psi = (e_perc - a_perc) * np.log(e_perc / a_perc) - - logger.debug(f"sub_psi value is {sub_psi: .6f}") - - return sub_psi - - def psi_num(self): - """Calculate the PSI for a single variable. - - Returns: - PSI for column - The PSI for each bucket - New categories (empty list for non-categorical data) - Categories that are absent in actual column (empty list for non-categorical data) - - """ - buckets = 10 - breakpoints = np.arange(0, buckets / 10, 0.1) - - # Заплатка, на случай, если в актуальной таблице появились значения отличные от null - if self.expected_nulls == self.expected_len and self.actual_nulls != self.actual_len: - breakpoints = np.array(list(sorted(set(np.nanquantile(self.actual, breakpoints))))) - else: - breakpoints = np.array(list(sorted(set(np.nanquantile(self.expected, breakpoints))))) - - actual_nulls = self.actual_nulls / self.actual_len - expected_nulls = self.expected_nulls / self.expected_len - - breakpoints = np.concatenate(([-np.inf], breakpoints, [np.inf])) - - expected_percents = np.histogram(self.expected, breakpoints) - actual_percents = np.histogram(self.actual, breakpoints) - # breakpoints[0] = -np.inf - # breakpoints[-1] = np.inf - expected_percents = [p / self.expected_len for p in expected_percents[0]] - actual_percents = [p / self.actual_len for p in actual_percents[0]] - - if self.expected_nulls == 0 and actual_nulls == expected_nulls: - expected_percents = expected_percents - actual_percents = actual_percents - nulls = False - else: - expected_percents.append(expected_nulls) - actual_percents.append(actual_nulls) - nulls = True - - points = [i for i in breakpoints] - intervals = [f"({np.round(points[i], 5)};{np.round(points[i + 1], 5)})" for i in range(len(points) - 1)] - if nulls: - intervals = np.append(intervals, "empty_values") - - if self.plot: - self.plots(expected_percents, actual_percents, breakpoints, intervals) # в функции нет аргумента nulls - - psi_dict = {} - for i in range(0, len(expected_percents)): - psi_val = self.sub_psi(expected_percents[i], actual_percents[i]) - psi_dict.update({intervals[i]: psi_val}) - - psi_value = np.sum(list(psi_dict.values())) - psi_dict = {k: v for k, v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} - new_cats = [] - abs_cats = [] - - return psi_value, psi_dict, new_cats, abs_cats - - def uniq_psi(self): - """Calculates PSI for categorical unique counts grater than 100. - - Returns: - PSI for column - The PSI for each bucket - New categories (empty list for non-categorical data) - Categories that are absent in actual column (empty list for non-categorical data) - - """ - actual_nulls = self.actual_nulls / self.actual_len - expected_nulls = self.expected_nulls / self.expected_len - - actual_not_nulls_arr = self.actual[~np.isnan(self.actual)] - expected_not_nulls_arr = self.expected[~np.isnan(self.expected)] - - actual_not_nulls = len(actual_not_nulls_arr) / self.actual_len - expected_not_nulls = len(expected_not_nulls_arr) / self.expected_len - - expected_percents = [expected_not_nulls, expected_nulls] - actual_percents = [actual_not_nulls, actual_nulls] - - breakpoints = ["good_data", "nulls"] - if self.plot: - self.plots(expected_percents, actual_percents, breakpoints, breakpoints) # в функции нет аргумента nulls - - psi_dict = {} - for i in range(0, len(expected_percents)): - psi_val = self.sub_psi(expected_percents[i], actual_percents[i]) - if breakpoints[i] == "None": - psi_dict.update({"empty_value": psi_val}) - else: - psi_dict.update({breakpoints[i]: psi_val}) - - psi_value = np.sum(list(psi_dict.values())) - jac_metric = self.jac() - new_cats, abs_cats = [], [] - psi_dict = {k: v for k, v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} - - if psi_value >= 0.2: # что такое 0.2? Может перенести его в константу? - psi_value = psi_value - psi_dict.update({"metric": "stability_index"}) - else: - psi_value = 1 - jac_metric - psi_dict.update({"metric": "unique_index"}) - - logger.info(f"PSI for categorical unique >100 is {psi_value: .6f}") - - return psi_value, psi_dict, new_cats, abs_cats - - def psi_categ(self): - """Calculates PSI for categorical data excluding unique counts grater than 100. - - Returns: - PSI for column - The PSI for each bucket - New categories (empty list for non-categorical data) - Categories that are absent in actual column (empty list for non-categorical data) - - """ - expected_uniq_count = len(self.expected_uniqs) - actual_uniq_count = len(self.actual_uniqs) - # правило для категориальных > 100 - if expected_uniq_count > 100 or actual_uniq_count > 100: - psi_value, psi_dict, new_cats, abs_cats = self.uniq_psi() - - logger.info(f"PSI is {psi_value: .6f}") - - return psi_value, psi_dict, new_cats, abs_cats - - expected_dict = ( - pd.DataFrame(self.expected, columns=[self.column_name]) - .groupby(self.column_name)[self.column_name] - .count() - .sort_values(ascending=False) - .to_dict() - ) - actual_dict = ( - pd.DataFrame(self.actual, columns=[self.column_name]) - .groupby(self.column_name)[self.column_name] - .count() - .sort_values(ascending=False) - .to_dict() - ) - - breakpoints = list(expected_dict.keys() | actual_dict.keys()) - - new_cats = [k for k in actual_dict.keys() if k not in expected_dict.keys()] - abs_cats = [k for k in expected_dict.keys() if k not in actual_dict.keys()] - - expected_dict_re = {} - actual_dict_re = {} - - for b in breakpoints: - if b in expected_dict and b not in actual_dict: - expected_dict_re.update({b: expected_dict[b]}) - actual_dict_re.update({b: 0}) - elif b not in expected_dict and b in actual_dict: - expected_dict_re.update({b: 0}) - actual_dict_re.update({b: actual_dict[b]}) - elif b in expected_dict and b in actual_dict: - actual_dict_re.update({b: actual_dict[b]}) - expected_dict_re.update({b: expected_dict[b]}) - - category_names = [c for c in expected_dict_re.keys()] - groups = {} - g_counts = len(category_names) - group_num = 20 - if g_counts <= group_num: - for g_n, val in enumerate(category_names): - groups[val] = g_n - else: - group_size = np.floor(g_counts / group_num) - current_pos = 0 - reminder = g_counts % group_num - for g_n in range(group_num): - if g_n < group_num - reminder: - group_values = category_names[int(current_pos) : int(current_pos + group_size)] - current_pos += group_size - else: - group_values = category_names[int(current_pos) : int(current_pos + group_size + 1)] - current_pos += group_size + 1 - for val in group_values: - groups[val] = g_n - group_sum_exp = 0 - group_sum_act = 0 - exp_dict = {} - act_dict = {} - group_re = -1 - cat_group_name = "" - group_name_re = "" - for k, v in groups.items(): - current_group = v - if current_group == group_re: - group_re = v - exp_dict.pop(group_name_re, None) - act_dict.pop(group_name_re, None) - cat_group_name = cat_group_name + ", " + str(k) - group_sum_exp += expected_dict_re[k] - group_sum_act += actual_dict_re[k] - exp_dict.update({cat_group_name: group_sum_exp}) - act_dict.update({cat_group_name: group_sum_act}) - group_name_re = cat_group_name - else: - group_name_re = str(k) - group_re = v - cat_group_name = str(k) - group_sum_exp = expected_dict_re[k] - group_sum_act = actual_dict_re[k] - exp_dict.update({cat_group_name: group_sum_exp}) - act_dict.update({cat_group_name: group_sum_act}) - - expected_percents = [e / self.expected_len for e in exp_dict.values()] - actual_percents = [a / self.actual_len for a in act_dict.values()] - - breakpoints = [e for e in exp_dict.keys()] - - if self.plot: - self.plots( - expected_percents, actual_percents, breakpoints, breakpoints - ) # в функции plots нет аргумента nulls - - psi_dict = {} - for i in range(0, len(expected_percents)): - psi_val = self.sub_psi(expected_percents[i], actual_percents[i]) - if breakpoints[i] == "None": - psi_dict.update({"empty_value": psi_val}) - else: - psi_dict.update({breakpoints[i]: psi_val}) - psi_value = np.sum(list(psi_dict.values())) - psi_dict = {k: v for k, v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} - - return psi_value, psi_dict, new_cats, abs_cats - - def psi_result(self): - """Calculates PSI. - - Returns: - PSI for column - The PSI for each bucket - New categories (empty list for non-categorical data) - Categories that are absent in actual column (empty list for non-categorical data) - - """ - if len(self.expected_shape) == 1: - psi_values = np.empty(len(self.expected_shape)) - else: - psi_values = np.empty(self.expected_shape[self.axis]) - - for i in range(0, len(psi_values)): - if (self.column_type == np.dtype("O")) or ( - self.expected_nulls == self.expected_len and self.actual_nulls == self.actual_len - ): - psi_values, psi_dict, new_cats, abs_cats = self.psi_categ() - else: - psi_values, psi_dict, new_cats, abs_cats = self.psi_num() - - if self.silent: - logger.debug(f"PSI value: {psi_values: .3f}") - else: - logger.info(f"PSI value: {psi_values: .3f}") - - # если expected_shape пустой - будет ошибка - return round(psi_values, 2), psi_dict, new_cats, abs_cats - - -def report(expected: pd.DataFrame, actual: pd.DataFrame, plot: bool = False, silent: bool = False) -> pd.DataFrame: - """Generates a report using PSI (Population Stability Index) between the expected and actual data. - - Args: - expected: - The expected dataset - actual: - The new dataset you want to compare to the expected one - plot: - If True, plots the PSI are created. Defaults to False - silent: - If silent, logger in info mode - - - Returns: - A dataframe with the PSI report. The report includes the columns names, - metric names, check results, failed buckets, new categories and disappeared categories. - Anomaly score represent the PSI, metrics names indicate with metric was used for PSI calculation, - check results indicate whether the PSI is under the threshold (0.2), - and failed buckets include up to 5 buckets with the highest PSI. - - """ - if silent: - logger.debug("Creating report") - else: - logger.info("Creating report") - - assert len(expected.columns) == len(actual.columns) - - data_cols = expected.columns - score_dict = {} - new_cat_dict = {} - datas = [] - - for col in data_cols: - psi_res = PSI(expected, actual, col, plot=plot, silent=silent) - # отладка, в случае ошибки выдаст прооблемный столбец - try: - score, psi_dict, new_cats, abs_cats = psi_res.psi_result() - except: - logger.warning(f"Can not count PSIs, see column {col}") - continue - - if len(new_cats) > 0: - new_cat_dict.update({col: new_cats}) - - score_dict.update({col: score}) - check_result = "OK" if score < 0.2 else "NOK" # может 0.2 вынести в константу? - # psi_dict = {k:v for k,v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} - failed_buckets = list(psi_dict.keys())[:5] if score > 0.2 else [] - if "metric" in psi_dict: - new_cats = None - abs_cats = None - metric_name = psi_dict["metric"] - if metric_name == "unique_index": - failed_buckets = None - else: - metric_name = "stability_index" - data_tmp = pd.DataFrame( - { - "column": col, - "anomaly_score": score, - "metric_name": metric_name, - "check_result": check_result, - "failed_bucket": f"{failed_buckets}", - "new_category": f"{new_cats}", - "disappeared_category": f"{abs_cats}", - }, - index=[1], - ) - datas.append(data_tmp) - - data = pd.concat(datas, ignore_index=True) - - return data diff --git a/hypex/utils/tutorial_data_creation.py b/hypex/utils/tutorial_data_creation.py deleted file mode 100644 index 41885305..00000000 --- a/hypex/utils/tutorial_data_creation.py +++ /dev/null @@ -1,157 +0,0 @@ -import sys -from pathlib import Path -from typing import Iterable, Union - -import numpy as np -import pandas as pd - -ROOT = Path('').absolute().parents[0] -sys.path.append(str(ROOT)) - - -def set_nans( - data: pd.DataFrame, - na_step: Union[Iterable[int], int] = None, - nan_cols: Union[Iterable[str], str] = None -): - """Fill some values with NaN/ - - Args: - data: input dataframe - na_step: - num or list of nums of period to make NaN (step of range) - If list - iterates accordingly order of columns - nan_cols: - name of one or several columns to fill with NaN - If list - iterates accordingly order of na_step - - Returns: - data: dataframe with some NaNs - """ - if (nan_cols is not None) or (na_step is not None): - # correct type of columns to iterate - - # number of nans - if na_step is None: - na_step = [10] - print(f'No na_step specified: set to {na_step}') - elif not isinstance(na_step, Iterable): - na_step = [na_step] - - # columns - if nan_cols is None: - nan_cols = list(data.columns) - print('No nan_cols specified. Setting NaNs applied to all columns') - elif not isinstance(nan_cols, Iterable): - nan_cols = [nan_cols] - - # correct length of two lists - if len(na_step) > len(nan_cols): - na_step = na_step[:len(nan_cols)] - # print('Length of na_step is bigger than length of columns. Used only first values') TODO: set to logging - elif len(na_step) < len(nan_cols): - na_step = na_step + [na_step[-1]] * (len(nan_cols) - len(na_step)) - # print('Length of na_step is less than length of columns. Used last value several times') - - # create list of indexes to fill with na - nans_indexes = [list(range(i, len(data), period)) for i, period in enumerate(na_step)] - - for i in range(len(nan_cols)): - try: - data.loc[nans_indexes[i], nan_cols[i]] = np.nan - except KeyError: - print(f'There is no column {nan_cols[i]} in data. No nans in this column will be added.') - else: - print('No NaN added') - - return data - - -def create_test_data( - num_users: int = 10000, - na_step: Union[Iterable[int], int] = None, - nan_cols: Union[Iterable[str], str] = None, - file_name: str = None, - rs=None -): - """Creates data for tutorial. - - Args: - num_users: num of strings - na_step: - num or list of nums of period to make NaN (step of range) - If list - iterates accordingly order of columns - nan_cols: - name of one or several columns to fill with NaN - If list - iterates accordingly order of na_step - file_name: name of file to save; doesn't save file if None - - Returns: - data: dataframe with - """ - if rs is not None: - np.random.seed(rs) - - if (nan_cols is not None) and isinstance(nan_cols, str): - nan_cols = [nan_cols] - # Simulating dataset with known effect size - num_months = 12 - - # signup_months == 0 means customer did not sign up - signup_months = np.random.choice(np.arange(1, num_months), num_users) * np.random.randint(0, 2, size=num_users) - - data = pd.DataFrame( - { - "user_id": np.repeat(np.arange(num_users), num_months), - "signup_month": np.repeat(signup_months, num_months), # signup month == 0 means customer did not sign up - "month": np.tile(np.arange(1, num_months + 1), num_users), # months are from 1 to 12 - "spend": np.random.poisson(500, num_users * num_months), - } - ) - - # A customer is in the treatment group if and only if they signed up - data["treat"] = data["signup_month"] > 0 - - # Simulating an effect of month (monotonically decreasing--customers buy less later in the year) - data["spend"] = data["spend"] - data["month"] * 10 - - # Simulating a simple treatment effect of 100 - after_signup = (data["signup_month"] < data["month"]) & (data["treat"]) - data.loc[after_signup, "spend"] = data[after_signup]["spend"] + 100 - - # Setting the signup month (for ease of analysis) - i = 3 - data = ( - data[data.signup_month.isin([0, i])] - .groupby(["user_id", "signup_month", "treat"]) - .apply( - lambda x: pd.Series( - {"pre_spends": x.loc[x.month < i, "spend"].mean(), "post_spends": x.loc[x.month > i, "spend"].mean(), } - ) - ) - .reset_index() - ) - - # Additional category features - gender_i = np.random.choice(a=[0, 1], size=data.user_id.nunique()) - gender = [["M", "F"][i] for i in gender_i] - - age = np.random.choice(a=range(18, 70), size=data.user_id.nunique()) - - industry_i = np.random.choice(a=range(1, 3), size=data.user_id.nunique()) - industry_names = ["Finance", "E-commerce", "Logistics"] - industry = [industry_names[i] for i in industry_i] - - data["age"] = age - data["gender"] = gender - data["industry"] = industry - data["industry"] = data["industry"].astype("str") - data["treat"] = data["treat"].astype(int) - - # input nans in data if needed - data = set_nans(data, na_step, nan_cols) - - if file_name is not None: - data.to_csv(ROOT / f"{file_name}.csv", index=False) - - return data diff --git a/hypex/utils/validators.py b/hypex/utils/validators.py deleted file mode 100644 index 85ddda49..00000000 --- a/hypex/utils/validators.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Validators.""" -from typing import List - -import numpy as np -import pandas as pd -import scipy.stats as st - - -def random_treatment(df: pd.DataFrame, treatment: str): - """Replaces real treatment with a random placebo treatment. - - Args: - df: - The initial dataframe - treatment: - The columns name representing the treatment - - Returns: - The modified dataframe with the original treatment replaced - The original treatment series - A validation flag - """ - prop1 = df[treatment].sum() / df.shape[0] - prop0 = 1 - prop1 - new_treatment = np.random.choice([0, 1], size=df.shape[0], p=[prop0, prop1]) - validate = 1 - orig_treatment = df[treatment] - df = df.drop(columns=treatment) - df[treatment] = new_treatment - return df, orig_treatment, validate - - -def random_feature(df: pd.DataFrame): - """Adds a random feature to the initial dataset. - - Args: - df: - The initial dataframe - - Returns: - The modified dataframe with an additional random feature - A validation flag - """ - feature = np.random.normal(0, 1, size=len(df)) - validate = 1 - df["random_feature"] = feature - return df, validate - - -def subset_refuter(df: pd.DataFrame, treatment: str, fraction: float = 0.8): - """Returns a subset of data with given fraction (default 0.8). - - Args: - df: - The initial dataframe - treatment: - The column name representing the treatment - fraction: - The fraction of the dataset to divide random matching - - Returns: - The subset of the dataframe - A validation flag - """ - df = df.groupby(treatment, group_keys=False).apply(lambda x: x.sample(frac=fraction)) - validate = 1 - return df, validate - - -def test_significance(estimate: float, simulations: List) -> float: - """Performs a significance test for a normal distribution. - - Args: - estimate: - The estimated effect - simulations: - A list of estimated effects from each simulation - - Returns: - The p-value of the test - """ - mean_refute_value = np.mean(simulations) - std_dev_refute_values = np.std(simulations) - z_score = (estimate - mean_refute_value) / std_dev_refute_values - - if z_score > 0: # Right Tail - p_value = 1 - st.norm.cdf(z_score) - else: # Left Tail - p_value = st.norm.cdf(z_score) - - return p_value diff --git a/pyproject.toml b/pyproject.toml index 35591eb3..8f12cbab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,17 +26,17 @@ classifiers = [ [tool.poetry.dependencies] python = ">=3.8, <3.11" -tqdm = '*' +tqdm = "*" scikit-learn = "*" pandas = [ { version = ">=1.3.5, <=2.0.3", python = "<3.9" }, - { version = ">=1.3.5, <=2.1.2", python = ">=3.9" } + { version = ">=1.3.5, <=2.1.3", python = ">=3.9" } ] numpy = [ { version = ">=1.17.0, <=1.24.4", python = "<3.9" }, - { version = ">=1.17.0, <=1.26.1", python = ">=3.9" } + { version = ">=1.17.0, <=1.26.2", python = ">=3.9" } ] scipy = [ @@ -46,7 +46,8 @@ scipy = [ matplotlib = [ { version = ">=3.0.0, <=3.7.3", python = "<3.9" }, - { version = ">=3.0.0, <=3.8.1", python = ">=3.9, <3.11" }] + { version = ">=3.0.0, <=3.8.1", python = ">=3.9, <3.11" } +] faiss-cpu = ">=1.6.0, <=1.7.4" @@ -55,14 +56,74 @@ faiss-cpu = ">=1.6.0, <=1.7.4" jupyter = "^1.0.0" pytest = "^7.4.3" -sphinx = { version = "^7.2.6", python = ">=3.9, <3.11" } +sphinx = [ + { version = "^7.2.6", python = ">=3.9, <3.11" }, + { version = "<=7.1.0", python = "<3.9" }, +] nbsphinx = "*" nbsphinx_link = "*" sphinx_autodoc_typehints = "*" sphinx_rtd_theme = "^1.2.2" +flake8 = "<=6.1.0" + +mypy = "<=1.7.0" +pandas-stubs = "<=2.1.1.230928" +no_implicit_optional = "<=1.4" + +pre-commit = "==3.5.0" +isort = "<5.12.0" + +rstcheck = "<=6.2.0" [build-system] requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" \ No newline at end of file +build-backend = "poetry.core.masonry.api" + + +[tool.black] +line-length = 120 +include = '\.pyi?$' +exclude = ''' +/( + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' + + +[tool.isort] +profile = "black" +force_single_line = true +atomic = true +include_trailing_comma = true +lines_after_imports = 2 +lines_between_types = 1 +use_parentheses = true +filter_files = true + + +[tool.mypy.hypex] +ignore_missing_imports = false +disallow_untyped_defs = true +disallow_untyped_calls = true +warn_redundant_casts = true +warn_return_any = true +check_untyped_defs = true + +[[tool.mypy.overrides]] +module = "scipy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "faiss.*" +ignore_missing_imports = true diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_aa.py b/tests/test_aa.py deleted file mode 100644 index f5b07cf6..00000000 --- a/tests/test_aa.py +++ /dev/null @@ -1,72 +0,0 @@ -import pandas as pd -import pytest - -from hypex.ab_test.ab_tester import AATest -from hypex.utils.tutorial_data_creation import create_test_data - - -@pytest.fixture -def data(): - return create_test_data(rs=52) - - -@pytest.fixture -def iterations(): - return 20 - - -@pytest.fixture -def info_col(): - return "user_id" - - -def test_aa_simple(data, iterations, info_col): - model = AATest(target_fields=["pre_spends", "post_spends"], info_cols=info_col) - res, datas_dict = model.search_dist_uniform_sampling(data, iterations=iterations) - - assert isinstance(res, pd.DataFrame), "Metrics are not dataframes" - assert res.shape[0] == iterations, ( - "Metrics dataframe contains more or less rows with random states " "(#rows should be equal #of experiments" - ) - assert isinstance(datas_dict, dict), "Result is not dict" - assert len(datas_dict) == iterations, "# of dataframes is not equal # of iterations" - assert all(data.columns) == all( - datas_dict[0].drop(columns=["group"]).columns - ), "Columns in the result are not the same as columns in initial data " - - -def test_aa_group(data, iterations, info_col): - group_cols = "industry" - - model = AATest(target_fields=["pre_spends", "post_spends"], info_cols=info_col, group_cols=group_cols) - res, datas_dict = model.search_dist_uniform_sampling(data, iterations=iterations) - - assert isinstance(res, pd.DataFrame), "Metrics are not dataframes" - assert res.shape[0] == iterations, ( - "Metrics dataframe contains more or less rows with random states " "(#rows should be equal #of experiments" - ) - assert isinstance(datas_dict, dict), "Result is not dict" - assert len(datas_dict) == iterations, "# of dataframes is not equal # of iterations" - assert all(data.columns) == all(datas_dict[0].drop(columns=["group"]).columns), ( - "Columns in the result are not " "the same as columns in initial " "data " - ) - - -def test_aa_quantfields(data, iterations, info_col): - group_cols = "industry" - quant_field = "gender" - - model = AATest( - target_fields=["pre_spends", "post_spends"], info_cols=info_col, group_cols=group_cols, quant_field=quant_field - ) - res, datas_dict = model.search_dist_uniform_sampling(data, iterations=iterations) - - assert isinstance(res, pd.DataFrame), "Metrics are not dataframes" - assert res.shape[0] == iterations, ( - "Metrics dataframe contains more or less rows with random states " "(#rows should be equal #of experiments" - ) - assert isinstance(datas_dict, dict), "Result is not dict" - assert len(datas_dict) == iterations, "# of dataframes is not equal # of iterations" - assert all(data.columns) == all(datas_dict[0].drop(columns=["group"]).columns), ( - "Columns in the result are not " "the same as columns in initial " "data " - ) diff --git a/tests/test_ab.py b/tests/test_ab.py deleted file mode 100644 index 4e9cfa98..00000000 --- a/tests/test_ab.py +++ /dev/null @@ -1,92 +0,0 @@ -from ...hypex.ab_test.ab_tester import ABTest - -import pytest -import pandas as pd -import numpy as np - -DATA_SIZE = 100 - - -@pytest.fixture -def ab_test(): - return ABTest() - - -@pytest.fixture -def data(): - # Generate synthetic data for group A - group_a_data = np.random.normal(loc=10, scale=2, size=DATA_SIZE) - # Generate synthetic data for group B - group_b_data = np.random.normal(loc=12, scale=2, size=DATA_SIZE) - group_bp_data = np.random.normal(loc=10, scale=2, size=DATA_SIZE * 2) - return pd.DataFrame( - { - "group": ["control"] * len(group_a_data) + ["test"] * len(group_b_data), - "value": list(group_a_data) + list(group_b_data), - "previous_value": group_bp_data, - } - ) - - -@pytest.fixture -def target_field(): - return "value" - - -@pytest.fixture -def group_field(): - return "group" - - -@pytest.fixture -def previous_value(): - return "previous_value" - - -@pytest.fixture -def alpha(): - return 0.05 - - -def test_split_ab(ab_test, data, group_field): - result = ab_test.split_ab(data, group_field) - assert len(result["test"]) == DATA_SIZE - assert len(result["control"]) == DATA_SIZE - - -def test_calc_difference(ab_test, data, group_field, target_field, previous_value): - splitted_data = ab_test.split_ab(data, group_field) - result = ab_test.calc_difference(splitted_data, target_field, previous_value) - assert 1 < result["ate"] < 3 - assert 1 < result["cuped"] < 3 - assert 1 < result["diff_in_diff"] < 3 - - -def test_calc_difference_with_previous_value(ab_test, data, group_field, target_field, previous_value): - ab_test.calc_difference_method = "ate" - splitted_data = ab_test.split_ab(data, group_field) - result = ab_test.calc_difference(splitted_data, previous_value) - assert -1 < result["ate"] < 1 - - -def test_calc_p_value(ab_test, data, group_field, target_field, previous_value, alpha): - splitted_data = ab_test.split_ab(data, group_field) - result = ab_test.calc_p_value(splitted_data, target_field) - assert result["t_test"] < alpha - assert result["mann_whitney"] < alpha - - result = ab_test.calc_p_value(splitted_data, previous_value) - assert result["t_test"] > alpha - assert result["mann_whitney"] > alpha - - -def test_execute(ab_test, data, group_field, target_field, previous_value, alpha): - result = ab_test.execute(data, target_field, group_field, previous_value) - print(result) - assert result["size"]["test"] == DATA_SIZE - assert result["size"]["control"] == DATA_SIZE - assert 1 < result["difference"]["ate"] < 3 - assert 1 < result["difference"]["cuped"] < 3 - assert 1 < result["difference"]["diff_in_diff"] < 3 - assert result["p_value"]["t_test"] < alpha - assert result["p_value"]["mann_whitney"] < alpha diff --git a/tests/test_example.py b/tests/test_example.py new file mode 100644 index 00000000..db45a7d3 --- /dev/null +++ b/tests/test_example.py @@ -0,0 +1,22 @@ +from typing import Any + +import pytest + + +@pytest.mark.parametrize("test_input, expected", [(1, 1), (2, 2), (3, 3)]) +def test_example(test_input: Any, expected: Any) -> None: + """ + Tests if the input values are equal to the expected values. + + This test uses parametrization to check multiple pairs of values. + It ensures that each input argument is equal to its expected value. + + Args: + test_input: The input value for the test. + expected: The expected value to compare against the input. + + Returns: + None. The test simply asserts the condition. + """ + assert test_input == expected, f"Expected {expected}, got {test_input}" + return diff --git a/tests/test_matcher.py b/tests/test_matcher.py deleted file mode 100644 index d3da73b7..00000000 --- a/tests/test_matcher.py +++ /dev/null @@ -1,115 +0,0 @@ -import pandas as pd -import sys -from pathlib import Path - -from hypex import Matcher -from hypex.utils.tutorial_data_creation import create_test_data - -ROOT = Path("").absolute().parents[0] -sys.path.append(str(ROOT)) - - -# добавить дату в данные и пофиксить баги с этим -# учесть если info_col передается листом из одного значения или строкой - - -def create_model(group_col: str = None): - data = pd.read_csv(ROOT / "Tutorial_data.csv") - info_col = ["user_id", "signup_month"] - outcome = "post_spends" - treatment = "treat" - - model = Matcher(input_data=data, outcome=outcome, treatment=treatment, info_col=info_col, group_col=group_col) - - return model - - -def test_matcher_pos(): - model = create_model() - res, quality_res, df_matched = model.estimate() - - assert len(model.quality_result.keys()) == 4, "quality results return not four metrics" - assert list(model.quality_result.keys()) == ["psi", "ks_test", "smd", "repeats"], "metrics renamed" - - assert list(model.results.index) == ["ATE", "ATC", "ATT"], "format of results is changed: type of effects" - assert list(model.results.columns) == [ - "effect_size", - "std_err", - "p-val", - "ci_lower", - "ci_upper", - "post_spends", - ], "format of results is changed: columns in report" - assert model.results["p-val"].values[0] <= 0.05, "p-value on ATE is greater than 0.1" - assert model.results["p-val"].values[1] <= 0.05, "p-value on ATC is greater than 0.1" - assert model.results["p-val"].values[2] <= 0.05, "p-value on ATT is greater than 0.1" - - assert isinstance(res, tuple), "result of function estimate is not tuple" - assert len(res) == 3, "tuple does not return 3 values" - - -def test_matcher_group_pos(): - model = create_model(group_col="industry") - res, quality_res, df_matched = model.estimate() - - assert len(model.quality_result.keys()) == 4, "quality results return not 4 metrics" - assert list(model.quality_result.keys()) == [ - "psi", - "ks_test", - "smd", - "repeats", - ], "metrics renamed, there should be ['psi', 'ks_test', 'smd', 'repeats']" - - assert list(model.results.index) == [ - "ATE", - "ATC", - "ATT", - ], "format of results is changed: type of effects (ATE, ATC, ATT)" - assert list(model.results.columns) == [ - "effect_size", - "std_err", - "p-val", - "ci_lower", - "ci_upper", - "post_spends", - ], "format of results is changed: columns in report ['effect_size', 'std_err', 'p-val', 'ci_lower', 'ci_upper']" - assert model.results["p-val"].values[0] <= 0.05, "p-value on ATE is greater than 0.1" - assert model.results["p-val"].values[1] <= 0.05, "p-value on ATC is greater than 0.1" - assert model.results["p-val"].values[2] <= 0.05, "p-value on ATT is greater than 0.1" - - assert isinstance(res, tuple), "result of function estimate is not tuple" - assert len(res) == 3, "tuple does not return 3 values" - - -def test_matcher_big_data_pos(): - data = create_test_data(1_000_000) - info_col = ["user_id", "signup_month"] - outcome = "post_spends" - treatment = "treat" - - model = Matcher(input_data=data, outcome=outcome, treatment=treatment, info_col=info_col) - results, quality_results, df_matched = model.estimate() - - assert isinstance(model.estimate(), tuple), "result of function estimate is not tuple" - assert len(model.estimate()) == 3, "tuple does not return 3 values" - assert len(quality_results.keys()) == 4, "quality results return not four metrics" - - -def test_lama_feature_pos(): - model = create_model() - res = model.lama_feature_select() - - assert len(res) > 0, "features return empty" - - -def test_validate_result_pos(): - model = create_model() - model.estimate() - res = model.validate_result() - """ - refuter: str - Refuter type (`random_treatment` , `random_feature` default, `subset_refuter`) - """ - - assert len(res) > 0, "features return empty" - assert list(model.pval_dict.values())[0][1] > 0.05, "p-value on validate results is less than 0.05"